Skip to main content

fraiseql_core/db/postgres/
adapter.rs

1//! PostgreSQL database adapter implementation.
2
3use async_trait::async_trait;
4use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
5use tokio_postgres::{NoTls, Row};
6
7use super::where_generator::PostgresWhereGenerator;
8use crate::{
9    db::{
10        identifier::quote_postgres_identifier,
11        traits::DatabaseAdapter,
12        types::{DatabaseType, JsonbValue, PoolMetrics, QueryParam},
13        where_clause::WhereClause,
14    },
15    error::{FraiseQLError, Result},
16    schema::SqlProjectionHint,
17};
18
19/// Default maximum pool size for PostgreSQL connections.
20/// Increased from 10 to 25 to prevent pool exhaustion under concurrent
21/// nested query load (fixes Issue #41).
22const DEFAULT_POOL_SIZE: usize = 25;
23
24/// Maximum retries for connection acquisition with exponential backoff.
25const MAX_CONNECTION_RETRIES: u32 = 3;
26
27/// Base delay in milliseconds for connection retry backoff.
28const CONNECTION_RETRY_DELAY_MS: u64 = 50;
29
30/// PostgreSQL database adapter with connection pooling.
31///
32/// Uses `deadpool-postgres` for connection pooling and `tokio-postgres` for async queries.
33///
34/// # Example
35///
36/// ```rust,no_run
37/// use fraiseql_core::db::postgres::PostgresAdapter;
38/// use fraiseql_core::db::{DatabaseAdapter, WhereClause, WhereOperator};
39/// use serde_json::json;
40///
41/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
42/// // Create adapter with connection string
43/// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
44///
45/// // Execute query
46/// let where_clause = WhereClause::Field {
47///     path: vec!["email".to_string()],
48///     operator: WhereOperator::Icontains,
49///     value: json!("example.com"),
50/// };
51///
52/// let results = adapter
53///     .execute_where_query("v_user", Some(&where_clause), Some(10), None)
54///     .await?;
55///
56/// println!("Found {} users", results.len());
57/// # Ok(())
58/// # }
59/// ```
60#[derive(Clone)]
61pub struct PostgresAdapter {
62    pool: Pool,
63}
64
65impl PostgresAdapter {
66    /// Create new PostgreSQL adapter with default pool configuration.
67    ///
68    /// # Arguments
69    ///
70    /// * `connection_string` - PostgreSQL connection string (e.g., "postgresql://localhost/mydb")
71    ///
72    /// # Errors
73    ///
74    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
75    ///
76    /// # Example
77    ///
78    /// ```rust,no_run
79    /// # use fraiseql_core::db::postgres::PostgresAdapter;
80    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
81    /// let adapter = PostgresAdapter::new("postgresql://localhost/mydb").await?;
82    /// # Ok(())
83    /// # }
84    /// ```
85    pub async fn new(connection_string: &str) -> Result<Self> {
86        Self::with_pool_size(connection_string, DEFAULT_POOL_SIZE).await
87    }
88
89    /// Create new PostgreSQL adapter with custom pool configuration.
90    ///
91    /// # Arguments
92    ///
93    /// * `connection_string` - PostgreSQL connection string
94    /// * `min_size` - Minimum size hint (not enforced by deadpool-postgres)
95    /// * `max_size` - Maximum number of connections in pool
96    ///
97    /// # Errors
98    ///
99    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
100    ///
101    /// # Note
102    ///
103    /// `min_size` is accepted for API compatibility but deadpool-postgres uses
104    /// lazy initialization with dynamic pool sizing up to `max_size`.
105    pub async fn with_pool_config(
106        connection_string: &str,
107        _min_size: usize,
108        max_size: usize,
109    ) -> Result<Self> {
110        Self::with_pool_size(connection_string, max_size).await
111    }
112
113    /// Create new PostgreSQL adapter with custom pool size.
114    ///
115    /// # Arguments
116    ///
117    /// * `connection_string` - PostgreSQL connection string
118    /// * `max_size` - Maximum number of connections in pool
119    ///
120    /// # Errors
121    ///
122    /// Returns `FraiseQLError::ConnectionPool` if pool creation fails.
123    pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
124        let mut cfg = Config::new();
125        cfg.url = Some(connection_string.to_string());
126        cfg.manager = Some(ManagerConfig {
127            recycling_method: RecyclingMethod::Fast,
128        });
129        cfg.pool = Some(deadpool_postgres::PoolConfig::new(max_size));
130
131        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).map_err(|e| {
132            FraiseQLError::ConnectionPool {
133                message: format!("Failed to create connection pool: {e}"),
134            }
135        })?;
136
137        // Test connection
138        let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
139            message: format!("Failed to acquire connection: {e}"),
140        })?;
141
142        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
143            message:   format!("Failed to connect to database: {e}"),
144            sql_state: e.code().map(|c| c.code().to_string()),
145        })?;
146
147        Ok(Self { pool })
148    }
149
150    /// Get a reference to the internal connection pool.
151    ///
152    /// This allows sharing the pool with other components like `PostgresIntrospector`.
153    #[must_use]
154    pub fn pool(&self) -> &Pool {
155        &self.pool
156    }
157
158    /// Execute raw SQL query and return JSONB rows.
159    ///
160    /// # Errors
161    ///
162    /// Returns `FraiseQLError::Database` on query execution failure.
163    async fn execute_raw(
164        &self,
165        sql: &str,
166        params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
167    ) -> Result<Vec<JsonbValue>> {
168        let client = self.acquire_connection_with_retry().await?;
169
170        let rows: Vec<Row> =
171            client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
172                message:   format!("Query execution failed: {e}"),
173                sql_state: e.code().map(|c| c.code().to_string()),
174            })?;
175
176        let results = rows
177            .into_iter()
178            .map(|row| {
179                let data: serde_json::Value = row.get(0);
180                JsonbValue::new(data)
181            })
182            .collect();
183
184        Ok(results)
185    }
186
187    /// Acquire a connection from the pool with retry logic.
188    ///
189    /// Implements exponential backoff retry when the pool is exhausted.
190    /// This prevents transient pool exhaustion from causing query failures
191    /// under concurrent load (fixes Issue #41).
192    ///
193    /// # Errors
194    ///
195    /// Returns `FraiseQLError::ConnectionPool` if all retries are exhausted.
196    async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
197        let mut last_error = None;
198
199        for attempt in 0..MAX_CONNECTION_RETRIES {
200            match self.pool.get().await {
201                Ok(client) => {
202                    if attempt > 0 {
203                        tracing::info!(
204                            "Successfully acquired connection after {} retries",
205                            attempt
206                        );
207                    }
208                    return Ok(client);
209                },
210                Err(e) => {
211                    last_error = Some(e);
212                    if attempt < MAX_CONNECTION_RETRIES - 1 {
213                        let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
214                        tracing::warn!(
215                            "Connection pool exhausted (attempt {}/{}), retrying in {}ms...",
216                            attempt + 1,
217                            MAX_CONNECTION_RETRIES,
218                            delay
219                        );
220                        tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
221                    }
222                },
223            }
224        }
225
226        // All retries exhausted - log detailed pool state
227        let pool_metrics = self.pool_metrics();
228        tracing::error!(
229            "Failed to acquire connection after {} retries. Pool state: available={}, active={}, total={}",
230            MAX_CONNECTION_RETRIES,
231            pool_metrics.idle_connections,
232            pool_metrics.active_connections,
233            pool_metrics.total_connections
234        );
235
236        Err(FraiseQLError::ConnectionPool {
237            message: format!(
238                "Failed to acquire connection after {} retries: {}. Pool exhausted (available={}/{}). Consider increasing pool size or reducing concurrent load.",
239                MAX_CONNECTION_RETRIES,
240                last_error.unwrap(),
241                pool_metrics.idle_connections,
242                pool_metrics.total_connections
243            ),
244        })
245    }
246
247    /// Execute query with SQL field projection optimization.
248    ///
249    /// Uses the provided `SqlProjectionHint` to generate optimized SQL that projects
250    /// only the requested fields from the JSONB column, reducing network payload and
251    /// JSON deserialization overhead.
252    ///
253    /// # Arguments
254    ///
255    /// * `view` - View/table name to query
256    /// * `projection` - Optional SQL projection hint with field list
257    /// * `where_clause` - Optional WHERE clause for filtering
258    /// * `limit` - Optional row limit
259    ///
260    /// # Returns
261    ///
262    /// Vector of projected JSONB rows with only the requested fields
263    ///
264    /// # Errors
265    ///
266    /// Returns `FraiseQLError::Database` on query execution failure.
267    ///
268    /// # Example
269    ///
270    /// ```rust,ignore
271    /// let projection = SqlProjectionHint {
272    ///     database: "postgresql".to_string(),
273    ///     projection_template: "...".to_string(),
274    ///     estimated_reduction_percent: 75,
275    /// };
276    ///
277    /// let results = adapter
278    ///     .execute_with_projection("v_user", Some(&projection), None, Some(10))
279    ///     .await?;
280    /// ```
281    pub async fn execute_with_projection(
282        &self,
283        view: &str,
284        projection: Option<&SqlProjectionHint>,
285        where_clause: Option<&WhereClause>,
286        limit: Option<u32>,
287    ) -> Result<Vec<JsonbValue>> {
288        // If no projection, fall back to standard query
289        if projection.is_none() {
290            return self.execute_where_query(view, where_clause, limit, None).await;
291        }
292
293        let projection = projection.unwrap();
294
295        // Build SQL with projection
296        // The projection_template is expected to be the SELECT clause with projection SQL
297        // e.g., "jsonb_build_object('id', data->>'id', 'email', data->>'email')"
298        let mut sql = format!(
299            "SELECT {} FROM {}",
300            projection.projection_template,
301            quote_postgres_identifier(view)
302        );
303
304        // Add WHERE clause if present
305        if let Some(clause) = where_clause {
306            let generator = PostgresWhereGenerator::new();
307            let (where_sql, where_params) = generator.generate(clause)?;
308            sql.push_str(" WHERE ");
309            sql.push_str(&where_sql);
310
311            // Add parameterized LIMIT
312            let mut params = where_params;
313            let mut param_count = params.len();
314
315            if let Some(lim) = limit {
316                param_count += 1;
317                sql.push_str(&format!(" LIMIT ${param_count}"));
318                params.push(serde_json::Value::Number(lim.into()));
319            }
320
321            // Convert JSON values to QueryParam (preserves types)
322            let typed_params: Vec<QueryParam> = params.into_iter().map(QueryParam::from).collect();
323
324            tracing::debug!("SQL with projection = {}", sql);
325            tracing::debug!("typed_params = {:?}", typed_params);
326
327            // Create references to QueryParam for ToSql
328            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
329                .iter()
330                .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
331                .collect();
332
333            self.execute_raw(&sql, &param_refs).await
334        } else {
335            // No WHERE clause
336            let mut params: Vec<serde_json::Value> = vec![];
337            let mut param_count = 0;
338
339            if let Some(lim) = limit {
340                param_count += 1;
341                sql.push_str(&format!(" LIMIT ${param_count}"));
342                params.push(serde_json::Value::Number(lim.into()));
343            }
344
345            // Convert JSON values to QueryParam (preserves types)
346            let typed_params: Vec<QueryParam> = params.into_iter().map(QueryParam::from).collect();
347
348            // Create references to QueryParam for ToSql
349            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
350                .iter()
351                .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
352                .collect();
353
354            self.execute_raw(&sql, &param_refs).await
355        }
356    }
357}
358
359#[async_trait]
360impl DatabaseAdapter for PostgresAdapter {
361    async fn execute_with_projection(
362        &self,
363        view: &str,
364        projection: Option<&SqlProjectionHint>,
365        where_clause: Option<&WhereClause>,
366        limit: Option<u32>,
367    ) -> Result<Vec<JsonbValue>> {
368        self.execute_with_projection(view, projection, where_clause, limit).await
369    }
370
371    async fn execute_where_query(
372        &self,
373        view: &str,
374        where_clause: Option<&WhereClause>,
375        limit: Option<u32>,
376        offset: Option<u32>,
377    ) -> Result<Vec<JsonbValue>> {
378        // Build base query
379        let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
380
381        // Collect WHERE clause params (if any)
382        let mut typed_params: Vec<QueryParam> = Vec::new();
383        let mut param_count = 0;
384
385        // Add WHERE clause if present
386        if let Some(clause) = where_clause {
387            let generator = PostgresWhereGenerator::new();
388            let (where_sql, where_params) = generator.generate(clause)?;
389            sql.push_str(" WHERE ");
390            sql.push_str(&where_sql);
391
392            // Convert WHERE clause JSON values to QueryParam
393            typed_params = where_params.into_iter().map(QueryParam::from).collect();
394            param_count = typed_params.len();
395        }
396
397        // Add LIMIT as BigInt (PostgreSQL requires integer type for LIMIT)
398        if let Some(lim) = limit {
399            param_count += 1;
400            sql.push_str(&format!(" LIMIT ${param_count}"));
401            typed_params.push(QueryParam::BigInt(i64::from(lim)));
402        }
403
404        // Add OFFSET as BigInt (PostgreSQL requires integer type for OFFSET)
405        if let Some(off) = offset {
406            param_count += 1;
407            sql.push_str(&format!(" OFFSET ${param_count}"));
408            typed_params.push(QueryParam::BigInt(i64::from(off)));
409        }
410
411        // Create references to QueryParam for ToSql
412        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
413            .iter()
414            .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
415            .collect();
416
417        self.execute_raw(&sql, &param_refs).await
418    }
419
420    fn database_type(&self) -> DatabaseType {
421        DatabaseType::PostgreSQL
422    }
423
424    async fn health_check(&self) -> Result<()> {
425        // Use retry logic for health check to avoid false negatives during pool exhaustion
426        let client = self.acquire_connection_with_retry().await?;
427
428        client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
429            message:   format!("Health check failed: {e}"),
430            sql_state: e.code().map(|c| c.code().to_string()),
431        })?;
432
433        Ok(())
434    }
435
436    fn pool_metrics(&self) -> PoolMetrics {
437        let status = self.pool.status();
438
439        PoolMetrics {
440            total_connections:  status.size as u32,
441            idle_connections:   status.available as u32,
442            active_connections: (status.size - status.available) as u32,
443            waiting_requests:   status.waiting as u32,
444        }
445    }
446
447    async fn execute_raw_query(
448        &self,
449        sql: &str,
450    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
451        // Use retry logic for connection acquisition
452        let client = self.acquire_connection_with_retry().await?;
453
454        let rows: Vec<Row> = client.query(sql, &[]).await.map_err(|e| FraiseQLError::Database {
455            message:   format!("Query execution failed: {e}"),
456            sql_state: e.code().map(|c| c.code().to_string()),
457        })?;
458
459        // Convert each row to HashMap<String, Value>
460        let results: Vec<std::collections::HashMap<String, serde_json::Value>> = rows
461            .into_iter()
462            .map(|row| {
463                let mut map = std::collections::HashMap::new();
464
465                // Iterate over all columns in the row
466                for (idx, column) in row.columns().iter().enumerate() {
467                    let column_name = column.name().to_string();
468
469                    // Try to extract value based on PostgreSQL type
470                    let value: serde_json::Value = if let Ok(v) = row.try_get::<_, i32>(idx) {
471                        serde_json::json!(v)
472                    } else if let Ok(v) = row.try_get::<_, i64>(idx) {
473                        serde_json::json!(v)
474                    } else if let Ok(v) = row.try_get::<_, f64>(idx) {
475                        serde_json::json!(v)
476                    } else if let Ok(v) = row.try_get::<_, String>(idx) {
477                        serde_json::json!(v)
478                    } else if let Ok(v) = row.try_get::<_, bool>(idx) {
479                        serde_json::json!(v)
480                    } else if let Ok(v) = row.try_get::<_, serde_json::Value>(idx) {
481                        v
482                    } else {
483                        // Fallback: NULL
484                        serde_json::Value::Null
485                    };
486
487                    map.insert(column_name, value);
488                }
489
490                map
491            })
492            .collect();
493
494        Ok(results)
495    }
496
497    async fn execute_function_call(
498        &self,
499        function_name: &str,
500        args: &[serde_json::Value],
501    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
502        // Build: SELECT * FROM fn_name($1, $2, ...)
503        let placeholders: Vec<String> =
504            (1..=args.len()).map(|i| format!("${i}")).collect();
505        let sql = format!(
506            "SELECT * FROM {function_name}({})",
507            placeholders.join(", ")
508        );
509
510        let client = self.acquire_connection_with_retry().await?;
511
512        // Bind each JSON argument as a text parameter (PostgreSQL can cast text→jsonb)
513        let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = args
514            .iter()
515            .map(|v| v as &(dyn tokio_postgres::types::ToSql + Sync))
516            .collect();
517
518        let rows: Vec<Row> =
519            client.query(sql.as_str(), params.as_slice()).await.map_err(|e| {
520                FraiseQLError::Database {
521                    message:   format!("Function call {function_name} failed: {e}"),
522                    sql_state: e.code().map(|c| c.code().to_string()),
523                }
524            })?;
525
526        let results = rows
527            .into_iter()
528            .map(|row| {
529                let mut map = std::collections::HashMap::new();
530                for (idx, column) in row.columns().iter().enumerate() {
531                    let column_name = column.name().to_string();
532                    let value: serde_json::Value =
533                        if let Ok(v) = row.try_get::<_, i32>(idx) {
534                            serde_json::json!(v)
535                        } else if let Ok(v) = row.try_get::<_, i64>(idx) {
536                            serde_json::json!(v)
537                        } else if let Ok(v) = row.try_get::<_, f64>(idx) {
538                            serde_json::json!(v)
539                        } else if let Ok(v) = row.try_get::<_, bool>(idx) {
540                            serde_json::json!(v)
541                        } else if let Ok(v) = row.try_get::<_, serde_json::Value>(idx) {
542                            v
543                        } else if let Ok(v) = row.try_get::<_, String>(idx) {
544                            serde_json::json!(v)
545                        } else {
546                            serde_json::Value::Null
547                        };
548                    map.insert(column_name, value);
549                }
550                map
551            })
552            .collect();
553
554        Ok(results)
555    }
556}
557
558/// PostgreSQL integration tests.
559///
560/// These tests require a running PostgreSQL database with test data.
561///
562/// ## Running the tests
563///
564/// ```bash
565/// # Start test database
566/// docker compose -f docker-compose.test.yml up -d postgres-test
567///
568/// # Run tests with the test-postgres feature
569/// cargo test -p fraiseql-core --features test-postgres db::postgres::adapter::tests
570///
571/// # Or run all tests including ignored ones (legacy method)
572/// cargo test -p fraiseql-core -- --ignored
573///
574/// # Stop test database
575/// docker compose -f docker-compose.test.yml down
576/// ```
577#[cfg(all(test, feature = "test-postgres"))]
578mod tests {
579    use serde_json::json;
580
581    use super::*;
582    use crate::db::{WhereClause, WhereOperator};
583
584    const TEST_DB_URL: &str =
585        "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
586
587    // Helper to create test adapter
588    async fn create_test_adapter() -> PostgresAdapter {
589        PostgresAdapter::new(TEST_DB_URL)
590            .await
591            .expect("Failed to create test adapter - is PostgreSQL running? Use: docker compose -f docker-compose.test.yml up -d postgres-test")
592    }
593
594    // ========================================================================
595    // Connection & Adapter Tests
596    // ========================================================================
597
598    #[tokio::test]
599    async fn test_adapter_creation() {
600        let adapter = create_test_adapter().await;
601        let metrics = adapter.pool_metrics();
602        assert!(metrics.total_connections > 0);
603        assert_eq!(adapter.database_type(), DatabaseType::PostgreSQL);
604    }
605
606    #[tokio::test]
607    async fn test_adapter_with_custom_pool_size() {
608        let adapter = PostgresAdapter::with_pool_size(TEST_DB_URL, 5)
609            .await
610            .expect("Failed to create adapter");
611
612        // Pool starts with 1 connection and grows on demand up to max_size
613        let metrics = adapter.pool_metrics();
614        assert!(metrics.total_connections >= 1, "Pool should have at least 1 connection");
615        assert!(metrics.total_connections <= 5, "Pool should not exceed max_size of 5");
616    }
617
618    #[tokio::test]
619    async fn test_health_check() {
620        let adapter = create_test_adapter().await;
621        adapter.health_check().await.expect("Health check failed");
622    }
623
624    #[tokio::test]
625    async fn test_pool_metrics() {
626        let adapter = create_test_adapter().await;
627        let metrics = adapter.pool_metrics();
628
629        assert!(metrics.total_connections > 0);
630        assert!(metrics.idle_connections <= metrics.total_connections);
631        assert_eq!(
632            metrics.active_connections,
633            metrics.total_connections - metrics.idle_connections
634        );
635    }
636
637    // ========================================================================
638    // Simple Query Tests (No WHERE Clause)
639    // ========================================================================
640
641    #[tokio::test]
642    async fn test_query_all_users() {
643        let adapter = create_test_adapter().await;
644
645        let results = adapter
646            .execute_where_query("v_user", None, None, None)
647            .await
648            .expect("Failed to query users");
649
650        assert_eq!(results.len(), 5, "Should have 5 test users");
651
652        // Verify JSONB structure
653        let first_user = results[0].as_value();
654        assert!(first_user.get("id").is_some());
655        assert!(first_user.get("email").is_some());
656        assert!(first_user.get("name").is_some());
657    }
658
659    #[tokio::test]
660    async fn test_query_all_posts() {
661        let adapter = create_test_adapter().await;
662
663        let results = adapter
664            .execute_where_query("v_post", None, None, None)
665            .await
666            .expect("Failed to query posts");
667
668        assert_eq!(results.len(), 4, "Should have 4 test posts");
669
670        // Verify nested author object
671        let first_post = results[0].as_value();
672        assert!(first_post.get("author").is_some());
673        assert!(first_post["author"].get("name").is_some());
674    }
675
676    // ========================================================================
677    // WHERE Clause Tests - Comparison Operators
678    // ========================================================================
679
680    #[tokio::test]
681    async fn test_where_eq() {
682        let adapter = create_test_adapter().await;
683
684        let where_clause = WhereClause::Field {
685            path:     vec!["email".to_string()],
686            operator: WhereOperator::Eq,
687            value:    json!("alice@example.com"),
688        };
689
690        let results = adapter
691            .execute_where_query("v_user", Some(&where_clause), None, None)
692            .await
693            .expect("Failed to execute query");
694
695        assert_eq!(results.len(), 1);
696        assert_eq!(results[0].as_value()["email"], "alice@example.com");
697    }
698
699    #[tokio::test]
700    async fn test_where_neq() {
701        let adapter = create_test_adapter().await;
702
703        let where_clause = WhereClause::Field {
704            path:     vec!["role".to_string()],
705            operator: WhereOperator::Neq,
706            value:    json!("user"),
707        };
708
709        let results = adapter
710            .execute_where_query("v_user", Some(&where_clause), None, None)
711            .await
712            .expect("Failed to execute query");
713
714        // Should return admin and moderator (not regular users)
715        assert!(results.len() >= 2);
716        for result in &results {
717            assert_ne!(result.as_value()["role"], "user");
718        }
719    }
720
721    #[tokio::test]
722    async fn test_where_gt() {
723        let adapter = create_test_adapter().await;
724
725        let where_clause = WhereClause::Field {
726            path:     vec!["age".to_string()],
727            operator: WhereOperator::Gt,
728            value:    json!(30),
729        };
730
731        let results = adapter
732            .execute_where_query("v_user", Some(&where_clause), None, None)
733            .await
734            .expect("Failed to execute query");
735
736        assert!(!results.is_empty(), "Should return at least one result");
737        assert_eq!(results.len(), 1, "Should return exactly 1 user (Charlie with age 35)");
738
739        for result in &results {
740            let age = result.as_value()["age"].as_i64().unwrap();
741            assert!(age > 30, "Age should be > 30, but got {}", age);
742        }
743    }
744
745    #[tokio::test]
746    async fn test_where_gte() {
747        let adapter = create_test_adapter().await;
748
749        let where_clause = WhereClause::Field {
750            path:     vec!["age".to_string()],
751            operator: WhereOperator::Gte,
752            value:    json!(30),
753        };
754
755        let results = adapter
756            .execute_where_query("v_user", Some(&where_clause), None, None)
757            .await
758            .expect("Failed to execute query");
759
760        for result in &results {
761            let age = result.as_value()["age"].as_i64().unwrap();
762            assert!(age >= 30);
763        }
764    }
765
766    // ========================================================================
767    // WHERE Clause Tests - String Operators
768    // ========================================================================
769
770    #[tokio::test]
771    async fn test_where_icontains() {
772        let adapter = create_test_adapter().await;
773
774        let where_clause = WhereClause::Field {
775            path:     vec!["email".to_string()],
776            operator: WhereOperator::Icontains,
777            value:    json!("example.com"),
778        };
779
780        let results = adapter
781            .execute_where_query("v_user", Some(&where_clause), None, None)
782            .await
783            .expect("Failed to execute query");
784
785        assert!(results.len() >= 3);
786        for result in &results {
787            let email = result.as_value()["email"].as_str().unwrap();
788            assert!(email.to_lowercase().contains("example.com"));
789        }
790    }
791
792    #[tokio::test]
793    async fn test_where_startswith() {
794        let adapter = create_test_adapter().await;
795
796        let where_clause = WhereClause::Field {
797            path:     vec!["name".to_string()],
798            operator: WhereOperator::Startswith,
799            value:    json!("Alice"),
800        };
801
802        let results = adapter
803            .execute_where_query("v_user", Some(&where_clause), None, None)
804            .await
805            .expect("Failed to execute query");
806
807        assert_eq!(results.len(), 1);
808        assert!(results[0].as_value()["name"].as_str().unwrap().starts_with("Alice"));
809    }
810
811    // ========================================================================
812    // WHERE Clause Tests - Logical Operators
813    // ========================================================================
814
815    #[tokio::test]
816    async fn test_where_and() {
817        let adapter = create_test_adapter().await;
818
819        let where_clause = WhereClause::And(vec![
820            WhereClause::Field {
821                path:     vec!["active".to_string()],
822                operator: WhereOperator::Eq,
823                value:    json!(true),
824            },
825            WhereClause::Field {
826                path:     vec!["age".to_string()],
827                operator: WhereOperator::Gte,
828                value:    json!(25),
829            },
830        ]);
831
832        let results = adapter
833            .execute_where_query("v_user", Some(&where_clause), None, None)
834            .await
835            .expect("Failed to execute query");
836
837        for result in &results {
838            assert_eq!(result.as_value()["active"], true);
839            let age = result.as_value()["age"].as_i64().unwrap();
840            assert!(age >= 25);
841        }
842    }
843
844    #[tokio::test]
845    async fn test_where_or() {
846        let adapter = create_test_adapter().await;
847
848        let where_clause = WhereClause::Or(vec![
849            WhereClause::Field {
850                path:     vec!["role".to_string()],
851                operator: WhereOperator::Eq,
852                value:    json!("admin"),
853            },
854            WhereClause::Field {
855                path:     vec!["role".to_string()],
856                operator: WhereOperator::Eq,
857                value:    json!("moderator"),
858            },
859        ]);
860
861        let results = adapter
862            .execute_where_query("v_user", Some(&where_clause), None, None)
863            .await
864            .expect("Failed to execute query");
865
866        assert!(results.len() >= 2);
867        for result in &results {
868            let role = result.as_value()["role"].as_str().unwrap();
869            assert!(role == "admin" || role == "moderator");
870        }
871    }
872
873    #[tokio::test]
874    async fn test_where_not() {
875        let adapter = create_test_adapter().await;
876
877        let where_clause = WhereClause::Not(Box::new(WhereClause::Field {
878            path:     vec!["active".to_string()],
879            operator: WhereOperator::Eq,
880            value:    json!(true),
881        }));
882
883        let results = adapter
884            .execute_where_query("v_user", Some(&where_clause), None, None)
885            .await
886            .expect("Failed to execute query");
887
888        for result in &results {
889            assert_ne!(result.as_value()["active"], json!(true));
890        }
891    }
892
893    // ========================================================================
894    // WHERE Clause Tests - Array Operators
895    // ========================================================================
896
897    #[tokio::test]
898    async fn test_where_in() {
899        let adapter = create_test_adapter().await;
900
901        let where_clause = WhereClause::Field {
902            path:     vec!["role".to_string()],
903            operator: WhereOperator::In,
904            value:    json!(["admin", "moderator"]),
905        };
906
907        let results = adapter
908            .execute_where_query("v_user", Some(&where_clause), None, None)
909            .await
910            .expect("Failed to execute query");
911
912        assert!(results.len() >= 2);
913        for result in &results {
914            let role = result.as_value()["role"].as_str().unwrap();
915            assert!(role == "admin" || role == "moderator");
916        }
917    }
918
919    // ========================================================================
920    // Pagination Tests
921    // ========================================================================
922
923    #[tokio::test]
924    async fn test_limit() {
925        let adapter = create_test_adapter().await;
926
927        let results = adapter
928            .execute_where_query("v_user", None, Some(2), None)
929            .await
930            .expect("Failed to execute query");
931
932        assert_eq!(results.len(), 2);
933    }
934
935    #[tokio::test]
936    async fn test_offset() {
937        let adapter = create_test_adapter().await;
938
939        let results_all = adapter
940            .execute_where_query("v_user", None, None, None)
941            .await
942            .expect("Failed to execute query");
943
944        let results_offset = adapter
945            .execute_where_query("v_user", None, None, Some(2))
946            .await
947            .expect("Failed to execute query");
948
949        assert_eq!(results_offset.len(), results_all.len() - 2);
950    }
951
952    #[tokio::test]
953    async fn test_limit_and_offset() {
954        let adapter = create_test_adapter().await;
955
956        let results = adapter
957            .execute_where_query("v_user", None, Some(2), Some(1))
958            .await
959            .expect("Failed to execute query");
960
961        assert_eq!(results.len(), 2);
962    }
963
964    // ========================================================================
965    // Nested Object Tests
966    // ========================================================================
967
968    #[tokio::test]
969    async fn test_nested_object_query() {
970        let adapter = create_test_adapter().await;
971
972        let where_clause = WhereClause::Field {
973            path:     vec!["metadata".to_string(), "city".to_string()],
974            operator: WhereOperator::Eq,
975            value:    json!("Paris"),
976        };
977
978        let results = adapter
979            .execute_where_query("v_user", Some(&where_clause), None, None)
980            .await
981            .expect("Failed to execute query");
982
983        assert!(!results.is_empty());
984        for result in &results {
985            assert_eq!(result.as_value()["metadata"]["city"], "Paris");
986        }
987    }
988
989    // ========================================================================
990    // Complex Query Tests
991    // ========================================================================
992
993    #[tokio::test]
994    async fn test_complex_nested_where() {
995        let adapter = create_test_adapter().await;
996
997        // (active = true) AND ((role = 'admin') OR (age >= 30))
998        let where_clause = WhereClause::And(vec![
999            WhereClause::Field {
1000                path:     vec!["active".to_string()],
1001                operator: WhereOperator::Eq,
1002                value:    json!(true),
1003            },
1004            WhereClause::Or(vec![
1005                WhereClause::Field {
1006                    path:     vec!["role".to_string()],
1007                    operator: WhereOperator::Eq,
1008                    value:    json!("admin"),
1009                },
1010                WhereClause::Field {
1011                    path:     vec!["age".to_string()],
1012                    operator: WhereOperator::Gte,
1013                    value:    json!(30),
1014                },
1015            ]),
1016        ]);
1017
1018        let results = adapter
1019            .execute_where_query("v_user", Some(&where_clause), None, None)
1020            .await
1021            .expect("Failed to execute query");
1022
1023        for result in &results {
1024            assert_eq!(result.as_value()["active"], true);
1025            let role = result.as_value()["role"].as_str().unwrap();
1026            let age = result.as_value()["age"].as_i64().unwrap();
1027            assert!(role == "admin" || age >= 30);
1028        }
1029    }
1030
1031    // ========================================================================
1032    // Error Handling Tests
1033    // ========================================================================
1034
1035    #[tokio::test]
1036    async fn test_invalid_view_name() {
1037        let adapter = create_test_adapter().await;
1038
1039        let result = adapter.execute_where_query("v_nonexistent", None, None, None).await;
1040
1041        assert!(result.is_err());
1042        match result {
1043            Err(FraiseQLError::Database { .. }) => (),
1044            _ => panic!("Expected Database error"),
1045        }
1046    }
1047
1048    #[tokio::test]
1049    async fn test_invalid_connection_string() {
1050        let result =
1051            PostgresAdapter::new("postgresql://invalid:invalid@localhost:9999/nonexistent").await;
1052
1053        assert!(result.is_err());
1054        match result {
1055            Err(FraiseQLError::ConnectionPool { .. }) => (),
1056            _ => panic!("Expected ConnectionPool error"),
1057        }
1058    }
1059
1060    // ========================================================================
1061    // Parameterized Query Tests (LIMIT/OFFSET with parameters)
1062    // ========================================================================
1063
1064    #[tokio::test]
1065    async fn test_parameterized_limit_only() {
1066        let adapter = create_test_adapter().await;
1067
1068        // Test that LIMIT is parameterized (not interpolated)
1069        let results = adapter
1070            .execute_where_query("v_user", None, Some(3), None)
1071            .await
1072            .expect("Failed to execute query");
1073
1074        assert_eq!(results.len(), 3, "Should return exactly 3 results with parameterized LIMIT");
1075    }
1076
1077    #[tokio::test]
1078    async fn test_parameterized_offset_only() {
1079        let adapter = create_test_adapter().await;
1080
1081        let results_all = adapter
1082            .execute_where_query("v_user", None, None, None)
1083            .await
1084            .expect("Failed to execute query");
1085
1086        let offset_val = 1;
1087        let results_offset = adapter
1088            .execute_where_query("v_user", None, None, Some(offset_val))
1089            .await
1090            .expect("Failed to execute query");
1091
1092        assert_eq!(results_offset.len(), results_all.len() - offset_val as usize);
1093    }
1094
1095    #[tokio::test]
1096    async fn test_parameterized_limit_and_offset() {
1097        let adapter = create_test_adapter().await;
1098
1099        // Query with both LIMIT and OFFSET parameterized
1100        let limit_val = 2;
1101        let offset_val = 1;
1102        let results = adapter
1103            .execute_where_query("v_user", None, Some(limit_val), Some(offset_val))
1104            .await
1105            .expect("Failed to execute query");
1106
1107        assert_eq!(
1108            results.len(),
1109            limit_val as usize,
1110            "Should return exactly {} results",
1111            limit_val
1112        );
1113    }
1114
1115    #[tokio::test]
1116    async fn test_parameterized_limit_with_where_clause() {
1117        let adapter = create_test_adapter().await;
1118
1119        let where_clause = WhereClause::Field {
1120            path:     vec!["active".to_string()],
1121            operator: WhereOperator::Eq,
1122            value:    json!(true),
1123        };
1124
1125        // Parameterized LIMIT with WHERE clause
1126        let results = adapter
1127            .execute_where_query("v_user", Some(&where_clause), Some(2), None)
1128            .await
1129            .expect("Failed to execute query");
1130
1131        assert!(results.len() <= 2);
1132        for result in &results {
1133            assert_eq!(result.as_value()["active"], true);
1134        }
1135    }
1136
1137    #[tokio::test]
1138    async fn test_parameterized_limit_and_offset_with_where_clause() {
1139        let adapter = create_test_adapter().await;
1140
1141        let where_clause = WhereClause::Field {
1142            path:     vec!["active".to_string()],
1143            operator: WhereOperator::Eq,
1144            value:    json!(true),
1145        };
1146
1147        // Parameterized LIMIT and OFFSET with WHERE clause
1148        let results = adapter
1149            .execute_where_query("v_user", Some(&where_clause), Some(2), Some(1))
1150            .await
1151            .expect("Failed to execute query");
1152
1153        assert!(results.len() <= 2);
1154        for result in &results {
1155            assert_eq!(result.as_value()["active"], true);
1156        }
1157    }
1158}