1use async_trait::async_trait;
4use deadpool_postgres::{Config, ManagerConfig, Pool, RecyclingMethod, Runtime};
5use tokio_postgres::{NoTls, Row};
6
7use super::where_generator::PostgresWhereGenerator;
8use crate::{
9 db::{
10 identifier::quote_postgres_identifier,
11 traits::DatabaseAdapter,
12 types::{DatabaseType, JsonbValue, PoolMetrics, QueryParam},
13 where_clause::WhereClause,
14 },
15 error::{FraiseQLError, Result},
16 schema::SqlProjectionHint,
17};
18
19const DEFAULT_POOL_SIZE: usize = 25;
23
24const MAX_CONNECTION_RETRIES: u32 = 3;
26
27const CONNECTION_RETRY_DELAY_MS: u64 = 50;
29
30#[derive(Clone)]
61pub struct PostgresAdapter {
62 pool: Pool,
63}
64
65impl PostgresAdapter {
66 pub async fn new(connection_string: &str) -> Result<Self> {
86 Self::with_pool_size(connection_string, DEFAULT_POOL_SIZE).await
87 }
88
89 pub async fn with_pool_config(
106 connection_string: &str,
107 _min_size: usize,
108 max_size: usize,
109 ) -> Result<Self> {
110 Self::with_pool_size(connection_string, max_size).await
111 }
112
113 pub async fn with_pool_size(connection_string: &str, max_size: usize) -> Result<Self> {
124 let mut cfg = Config::new();
125 cfg.url = Some(connection_string.to_string());
126 cfg.manager = Some(ManagerConfig {
127 recycling_method: RecyclingMethod::Fast,
128 });
129 cfg.pool = Some(deadpool_postgres::PoolConfig::new(max_size));
130
131 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).map_err(|e| {
132 FraiseQLError::ConnectionPool {
133 message: format!("Failed to create connection pool: {e}"),
134 }
135 })?;
136
137 let client = pool.get().await.map_err(|e| FraiseQLError::ConnectionPool {
139 message: format!("Failed to acquire connection: {e}"),
140 })?;
141
142 client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
143 message: format!("Failed to connect to database: {e}"),
144 sql_state: e.code().map(|c| c.code().to_string()),
145 })?;
146
147 Ok(Self { pool })
148 }
149
150 #[must_use]
154 pub fn pool(&self) -> &Pool {
155 &self.pool
156 }
157
158 async fn execute_raw(
164 &self,
165 sql: &str,
166 params: &[&(dyn tokio_postgres::types::ToSql + Sync)],
167 ) -> Result<Vec<JsonbValue>> {
168 let client = self.acquire_connection_with_retry().await?;
169
170 let rows: Vec<Row> =
171 client.query(sql, params).await.map_err(|e| FraiseQLError::Database {
172 message: format!("Query execution failed: {e}"),
173 sql_state: e.code().map(|c| c.code().to_string()),
174 })?;
175
176 let results = rows
177 .into_iter()
178 .map(|row| {
179 let data: serde_json::Value = row.get(0);
180 JsonbValue::new(data)
181 })
182 .collect();
183
184 Ok(results)
185 }
186
187 async fn acquire_connection_with_retry(&self) -> Result<deadpool_postgres::Client> {
197 let mut last_error = None;
198
199 for attempt in 0..MAX_CONNECTION_RETRIES {
200 match self.pool.get().await {
201 Ok(client) => {
202 if attempt > 0 {
203 tracing::info!(
204 "Successfully acquired connection after {} retries",
205 attempt
206 );
207 }
208 return Ok(client);
209 },
210 Err(e) => {
211 last_error = Some(e);
212 if attempt < MAX_CONNECTION_RETRIES - 1 {
213 let delay = CONNECTION_RETRY_DELAY_MS * (u64::from(attempt) + 1);
214 tracing::warn!(
215 "Connection pool exhausted (attempt {}/{}), retrying in {}ms...",
216 attempt + 1,
217 MAX_CONNECTION_RETRIES,
218 delay
219 );
220 tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
221 }
222 },
223 }
224 }
225
226 let pool_metrics = self.pool_metrics();
228 tracing::error!(
229 "Failed to acquire connection after {} retries. Pool state: available={}, active={}, total={}",
230 MAX_CONNECTION_RETRIES,
231 pool_metrics.idle_connections,
232 pool_metrics.active_connections,
233 pool_metrics.total_connections
234 );
235
236 Err(FraiseQLError::ConnectionPool {
237 message: format!(
238 "Failed to acquire connection after {} retries: {}. Pool exhausted (available={}/{}). Consider increasing pool size or reducing concurrent load.",
239 MAX_CONNECTION_RETRIES,
240 last_error.unwrap(),
241 pool_metrics.idle_connections,
242 pool_metrics.total_connections
243 ),
244 })
245 }
246
247 pub async fn execute_with_projection(
282 &self,
283 view: &str,
284 projection: Option<&SqlProjectionHint>,
285 where_clause: Option<&WhereClause>,
286 limit: Option<u32>,
287 ) -> Result<Vec<JsonbValue>> {
288 if projection.is_none() {
290 return self.execute_where_query(view, where_clause, limit, None).await;
291 }
292
293 let projection = projection.unwrap();
294
295 let mut sql = format!(
299 "SELECT {} FROM {}",
300 projection.projection_template,
301 quote_postgres_identifier(view)
302 );
303
304 if let Some(clause) = where_clause {
306 let generator = PostgresWhereGenerator::new();
307 let (where_sql, where_params) = generator.generate(clause)?;
308 sql.push_str(" WHERE ");
309 sql.push_str(&where_sql);
310
311 let mut params = where_params;
313 let mut param_count = params.len();
314
315 if let Some(lim) = limit {
316 param_count += 1;
317 sql.push_str(&format!(" LIMIT ${param_count}"));
318 params.push(serde_json::Value::Number(lim.into()));
319 }
320
321 let typed_params: Vec<QueryParam> = params.into_iter().map(QueryParam::from).collect();
323
324 tracing::debug!("SQL with projection = {}", sql);
325 tracing::debug!("typed_params = {:?}", typed_params);
326
327 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
329 .iter()
330 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
331 .collect();
332
333 self.execute_raw(&sql, ¶m_refs).await
334 } else {
335 let mut params: Vec<serde_json::Value> = vec![];
337 let mut param_count = 0;
338
339 if let Some(lim) = limit {
340 param_count += 1;
341 sql.push_str(&format!(" LIMIT ${param_count}"));
342 params.push(serde_json::Value::Number(lim.into()));
343 }
344
345 let typed_params: Vec<QueryParam> = params.into_iter().map(QueryParam::from).collect();
347
348 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
350 .iter()
351 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
352 .collect();
353
354 self.execute_raw(&sql, ¶m_refs).await
355 }
356 }
357}
358
359#[async_trait]
360impl DatabaseAdapter for PostgresAdapter {
361 async fn execute_with_projection(
362 &self,
363 view: &str,
364 projection: Option<&SqlProjectionHint>,
365 where_clause: Option<&WhereClause>,
366 limit: Option<u32>,
367 ) -> Result<Vec<JsonbValue>> {
368 self.execute_with_projection(view, projection, where_clause, limit).await
369 }
370
371 async fn execute_where_query(
372 &self,
373 view: &str,
374 where_clause: Option<&WhereClause>,
375 limit: Option<u32>,
376 offset: Option<u32>,
377 ) -> Result<Vec<JsonbValue>> {
378 let mut sql = format!("SELECT data FROM {}", quote_postgres_identifier(view));
380
381 let mut typed_params: Vec<QueryParam> = Vec::new();
383 let mut param_count = 0;
384
385 if let Some(clause) = where_clause {
387 let generator = PostgresWhereGenerator::new();
388 let (where_sql, where_params) = generator.generate(clause)?;
389 sql.push_str(" WHERE ");
390 sql.push_str(&where_sql);
391
392 typed_params = where_params.into_iter().map(QueryParam::from).collect();
394 param_count = typed_params.len();
395 }
396
397 if let Some(lim) = limit {
399 param_count += 1;
400 sql.push_str(&format!(" LIMIT ${param_count}"));
401 typed_params.push(QueryParam::BigInt(i64::from(lim)));
402 }
403
404 if let Some(off) = offset {
406 param_count += 1;
407 sql.push_str(&format!(" OFFSET ${param_count}"));
408 typed_params.push(QueryParam::BigInt(i64::from(off)));
409 }
410
411 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = typed_params
413 .iter()
414 .map(|p| p as &(dyn tokio_postgres::types::ToSql + Sync))
415 .collect();
416
417 self.execute_raw(&sql, ¶m_refs).await
418 }
419
420 fn database_type(&self) -> DatabaseType {
421 DatabaseType::PostgreSQL
422 }
423
424 async fn health_check(&self) -> Result<()> {
425 let client = self.acquire_connection_with_retry().await?;
427
428 client.query("SELECT 1", &[]).await.map_err(|e| FraiseQLError::Database {
429 message: format!("Health check failed: {e}"),
430 sql_state: e.code().map(|c| c.code().to_string()),
431 })?;
432
433 Ok(())
434 }
435
436 fn pool_metrics(&self) -> PoolMetrics {
437 let status = self.pool.status();
438
439 PoolMetrics {
440 total_connections: status.size as u32,
441 idle_connections: status.available as u32,
442 active_connections: (status.size - status.available) as u32,
443 waiting_requests: status.waiting as u32,
444 }
445 }
446
447 async fn execute_raw_query(
448 &self,
449 sql: &str,
450 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
451 let client = self.acquire_connection_with_retry().await?;
453
454 let rows: Vec<Row> = client.query(sql, &[]).await.map_err(|e| FraiseQLError::Database {
455 message: format!("Query execution failed: {e}"),
456 sql_state: e.code().map(|c| c.code().to_string()),
457 })?;
458
459 let results: Vec<std::collections::HashMap<String, serde_json::Value>> = rows
461 .into_iter()
462 .map(|row| {
463 let mut map = std::collections::HashMap::new();
464
465 for (idx, column) in row.columns().iter().enumerate() {
467 let column_name = column.name().to_string();
468
469 let value: serde_json::Value = if let Ok(v) = row.try_get::<_, i32>(idx) {
471 serde_json::json!(v)
472 } else if let Ok(v) = row.try_get::<_, i64>(idx) {
473 serde_json::json!(v)
474 } else if let Ok(v) = row.try_get::<_, f64>(idx) {
475 serde_json::json!(v)
476 } else if let Ok(v) = row.try_get::<_, String>(idx) {
477 serde_json::json!(v)
478 } else if let Ok(v) = row.try_get::<_, bool>(idx) {
479 serde_json::json!(v)
480 } else if let Ok(v) = row.try_get::<_, serde_json::Value>(idx) {
481 v
482 } else {
483 serde_json::Value::Null
485 };
486
487 map.insert(column_name, value);
488 }
489
490 map
491 })
492 .collect();
493
494 Ok(results)
495 }
496
497 async fn execute_function_call(
498 &self,
499 function_name: &str,
500 args: &[serde_json::Value],
501 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
502 let placeholders: Vec<String> =
504 (1..=args.len()).map(|i| format!("${i}")).collect();
505 let sql = format!(
506 "SELECT * FROM {function_name}({})",
507 placeholders.join(", ")
508 );
509
510 let client = self.acquire_connection_with_retry().await?;
511
512 let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = args
514 .iter()
515 .map(|v| v as &(dyn tokio_postgres::types::ToSql + Sync))
516 .collect();
517
518 let rows: Vec<Row> =
519 client.query(sql.as_str(), params.as_slice()).await.map_err(|e| {
520 FraiseQLError::Database {
521 message: format!("Function call {function_name} failed: {e}"),
522 sql_state: e.code().map(|c| c.code().to_string()),
523 }
524 })?;
525
526 let results = rows
527 .into_iter()
528 .map(|row| {
529 let mut map = std::collections::HashMap::new();
530 for (idx, column) in row.columns().iter().enumerate() {
531 let column_name = column.name().to_string();
532 let value: serde_json::Value =
533 if let Ok(v) = row.try_get::<_, i32>(idx) {
534 serde_json::json!(v)
535 } else if let Ok(v) = row.try_get::<_, i64>(idx) {
536 serde_json::json!(v)
537 } else if let Ok(v) = row.try_get::<_, f64>(idx) {
538 serde_json::json!(v)
539 } else if let Ok(v) = row.try_get::<_, bool>(idx) {
540 serde_json::json!(v)
541 } else if let Ok(v) = row.try_get::<_, serde_json::Value>(idx) {
542 v
543 } else if let Ok(v) = row.try_get::<_, String>(idx) {
544 serde_json::json!(v)
545 } else {
546 serde_json::Value::Null
547 };
548 map.insert(column_name, value);
549 }
550 map
551 })
552 .collect();
553
554 Ok(results)
555 }
556}
557
558#[cfg(all(test, feature = "test-postgres"))]
578mod tests {
579 use serde_json::json;
580
581 use super::*;
582 use crate::db::{WhereClause, WhereOperator};
583
584 const TEST_DB_URL: &str =
585 "postgresql://fraiseql_test:fraiseql_test_password@localhost:5433/test_fraiseql";
586
587 async fn create_test_adapter() -> PostgresAdapter {
589 PostgresAdapter::new(TEST_DB_URL)
590 .await
591 .expect("Failed to create test adapter - is PostgreSQL running? Use: docker compose -f docker-compose.test.yml up -d postgres-test")
592 }
593
594 #[tokio::test]
599 async fn test_adapter_creation() {
600 let adapter = create_test_adapter().await;
601 let metrics = adapter.pool_metrics();
602 assert!(metrics.total_connections > 0);
603 assert_eq!(adapter.database_type(), DatabaseType::PostgreSQL);
604 }
605
606 #[tokio::test]
607 async fn test_adapter_with_custom_pool_size() {
608 let adapter = PostgresAdapter::with_pool_size(TEST_DB_URL, 5)
609 .await
610 .expect("Failed to create adapter");
611
612 let metrics = adapter.pool_metrics();
614 assert!(metrics.total_connections >= 1, "Pool should have at least 1 connection");
615 assert!(metrics.total_connections <= 5, "Pool should not exceed max_size of 5");
616 }
617
618 #[tokio::test]
619 async fn test_health_check() {
620 let adapter = create_test_adapter().await;
621 adapter.health_check().await.expect("Health check failed");
622 }
623
624 #[tokio::test]
625 async fn test_pool_metrics() {
626 let adapter = create_test_adapter().await;
627 let metrics = adapter.pool_metrics();
628
629 assert!(metrics.total_connections > 0);
630 assert!(metrics.idle_connections <= metrics.total_connections);
631 assert_eq!(
632 metrics.active_connections,
633 metrics.total_connections - metrics.idle_connections
634 );
635 }
636
637 #[tokio::test]
642 async fn test_query_all_users() {
643 let adapter = create_test_adapter().await;
644
645 let results = adapter
646 .execute_where_query("v_user", None, None, None)
647 .await
648 .expect("Failed to query users");
649
650 assert_eq!(results.len(), 5, "Should have 5 test users");
651
652 let first_user = results[0].as_value();
654 assert!(first_user.get("id").is_some());
655 assert!(first_user.get("email").is_some());
656 assert!(first_user.get("name").is_some());
657 }
658
659 #[tokio::test]
660 async fn test_query_all_posts() {
661 let adapter = create_test_adapter().await;
662
663 let results = adapter
664 .execute_where_query("v_post", None, None, None)
665 .await
666 .expect("Failed to query posts");
667
668 assert_eq!(results.len(), 4, "Should have 4 test posts");
669
670 let first_post = results[0].as_value();
672 assert!(first_post.get("author").is_some());
673 assert!(first_post["author"].get("name").is_some());
674 }
675
676 #[tokio::test]
681 async fn test_where_eq() {
682 let adapter = create_test_adapter().await;
683
684 let where_clause = WhereClause::Field {
685 path: vec!["email".to_string()],
686 operator: WhereOperator::Eq,
687 value: json!("alice@example.com"),
688 };
689
690 let results = adapter
691 .execute_where_query("v_user", Some(&where_clause), None, None)
692 .await
693 .expect("Failed to execute query");
694
695 assert_eq!(results.len(), 1);
696 assert_eq!(results[0].as_value()["email"], "alice@example.com");
697 }
698
699 #[tokio::test]
700 async fn test_where_neq() {
701 let adapter = create_test_adapter().await;
702
703 let where_clause = WhereClause::Field {
704 path: vec!["role".to_string()],
705 operator: WhereOperator::Neq,
706 value: json!("user"),
707 };
708
709 let results = adapter
710 .execute_where_query("v_user", Some(&where_clause), None, None)
711 .await
712 .expect("Failed to execute query");
713
714 assert!(results.len() >= 2);
716 for result in &results {
717 assert_ne!(result.as_value()["role"], "user");
718 }
719 }
720
721 #[tokio::test]
722 async fn test_where_gt() {
723 let adapter = create_test_adapter().await;
724
725 let where_clause = WhereClause::Field {
726 path: vec!["age".to_string()],
727 operator: WhereOperator::Gt,
728 value: json!(30),
729 };
730
731 let results = adapter
732 .execute_where_query("v_user", Some(&where_clause), None, None)
733 .await
734 .expect("Failed to execute query");
735
736 assert!(!results.is_empty(), "Should return at least one result");
737 assert_eq!(results.len(), 1, "Should return exactly 1 user (Charlie with age 35)");
738
739 for result in &results {
740 let age = result.as_value()["age"].as_i64().unwrap();
741 assert!(age > 30, "Age should be > 30, but got {}", age);
742 }
743 }
744
745 #[tokio::test]
746 async fn test_where_gte() {
747 let adapter = create_test_adapter().await;
748
749 let where_clause = WhereClause::Field {
750 path: vec!["age".to_string()],
751 operator: WhereOperator::Gte,
752 value: json!(30),
753 };
754
755 let results = adapter
756 .execute_where_query("v_user", Some(&where_clause), None, None)
757 .await
758 .expect("Failed to execute query");
759
760 for result in &results {
761 let age = result.as_value()["age"].as_i64().unwrap();
762 assert!(age >= 30);
763 }
764 }
765
766 #[tokio::test]
771 async fn test_where_icontains() {
772 let adapter = create_test_adapter().await;
773
774 let where_clause = WhereClause::Field {
775 path: vec!["email".to_string()],
776 operator: WhereOperator::Icontains,
777 value: json!("example.com"),
778 };
779
780 let results = adapter
781 .execute_where_query("v_user", Some(&where_clause), None, None)
782 .await
783 .expect("Failed to execute query");
784
785 assert!(results.len() >= 3);
786 for result in &results {
787 let email = result.as_value()["email"].as_str().unwrap();
788 assert!(email.to_lowercase().contains("example.com"));
789 }
790 }
791
792 #[tokio::test]
793 async fn test_where_startswith() {
794 let adapter = create_test_adapter().await;
795
796 let where_clause = WhereClause::Field {
797 path: vec!["name".to_string()],
798 operator: WhereOperator::Startswith,
799 value: json!("Alice"),
800 };
801
802 let results = adapter
803 .execute_where_query("v_user", Some(&where_clause), None, None)
804 .await
805 .expect("Failed to execute query");
806
807 assert_eq!(results.len(), 1);
808 assert!(results[0].as_value()["name"].as_str().unwrap().starts_with("Alice"));
809 }
810
811 #[tokio::test]
816 async fn test_where_and() {
817 let adapter = create_test_adapter().await;
818
819 let where_clause = WhereClause::And(vec![
820 WhereClause::Field {
821 path: vec!["active".to_string()],
822 operator: WhereOperator::Eq,
823 value: json!(true),
824 },
825 WhereClause::Field {
826 path: vec!["age".to_string()],
827 operator: WhereOperator::Gte,
828 value: json!(25),
829 },
830 ]);
831
832 let results = adapter
833 .execute_where_query("v_user", Some(&where_clause), None, None)
834 .await
835 .expect("Failed to execute query");
836
837 for result in &results {
838 assert_eq!(result.as_value()["active"], true);
839 let age = result.as_value()["age"].as_i64().unwrap();
840 assert!(age >= 25);
841 }
842 }
843
844 #[tokio::test]
845 async fn test_where_or() {
846 let adapter = create_test_adapter().await;
847
848 let where_clause = WhereClause::Or(vec![
849 WhereClause::Field {
850 path: vec!["role".to_string()],
851 operator: WhereOperator::Eq,
852 value: json!("admin"),
853 },
854 WhereClause::Field {
855 path: vec!["role".to_string()],
856 operator: WhereOperator::Eq,
857 value: json!("moderator"),
858 },
859 ]);
860
861 let results = adapter
862 .execute_where_query("v_user", Some(&where_clause), None, None)
863 .await
864 .expect("Failed to execute query");
865
866 assert!(results.len() >= 2);
867 for result in &results {
868 let role = result.as_value()["role"].as_str().unwrap();
869 assert!(role == "admin" || role == "moderator");
870 }
871 }
872
873 #[tokio::test]
874 async fn test_where_not() {
875 let adapter = create_test_adapter().await;
876
877 let where_clause = WhereClause::Not(Box::new(WhereClause::Field {
878 path: vec!["active".to_string()],
879 operator: WhereOperator::Eq,
880 value: json!(true),
881 }));
882
883 let results = adapter
884 .execute_where_query("v_user", Some(&where_clause), None, None)
885 .await
886 .expect("Failed to execute query");
887
888 for result in &results {
889 assert_ne!(result.as_value()["active"], json!(true));
890 }
891 }
892
893 #[tokio::test]
898 async fn test_where_in() {
899 let adapter = create_test_adapter().await;
900
901 let where_clause = WhereClause::Field {
902 path: vec!["role".to_string()],
903 operator: WhereOperator::In,
904 value: json!(["admin", "moderator"]),
905 };
906
907 let results = adapter
908 .execute_where_query("v_user", Some(&where_clause), None, None)
909 .await
910 .expect("Failed to execute query");
911
912 assert!(results.len() >= 2);
913 for result in &results {
914 let role = result.as_value()["role"].as_str().unwrap();
915 assert!(role == "admin" || role == "moderator");
916 }
917 }
918
919 #[tokio::test]
924 async fn test_limit() {
925 let adapter = create_test_adapter().await;
926
927 let results = adapter
928 .execute_where_query("v_user", None, Some(2), None)
929 .await
930 .expect("Failed to execute query");
931
932 assert_eq!(results.len(), 2);
933 }
934
935 #[tokio::test]
936 async fn test_offset() {
937 let adapter = create_test_adapter().await;
938
939 let results_all = adapter
940 .execute_where_query("v_user", None, None, None)
941 .await
942 .expect("Failed to execute query");
943
944 let results_offset = adapter
945 .execute_where_query("v_user", None, None, Some(2))
946 .await
947 .expect("Failed to execute query");
948
949 assert_eq!(results_offset.len(), results_all.len() - 2);
950 }
951
952 #[tokio::test]
953 async fn test_limit_and_offset() {
954 let adapter = create_test_adapter().await;
955
956 let results = adapter
957 .execute_where_query("v_user", None, Some(2), Some(1))
958 .await
959 .expect("Failed to execute query");
960
961 assert_eq!(results.len(), 2);
962 }
963
964 #[tokio::test]
969 async fn test_nested_object_query() {
970 let adapter = create_test_adapter().await;
971
972 let where_clause = WhereClause::Field {
973 path: vec!["metadata".to_string(), "city".to_string()],
974 operator: WhereOperator::Eq,
975 value: json!("Paris"),
976 };
977
978 let results = adapter
979 .execute_where_query("v_user", Some(&where_clause), None, None)
980 .await
981 .expect("Failed to execute query");
982
983 assert!(!results.is_empty());
984 for result in &results {
985 assert_eq!(result.as_value()["metadata"]["city"], "Paris");
986 }
987 }
988
989 #[tokio::test]
994 async fn test_complex_nested_where() {
995 let adapter = create_test_adapter().await;
996
997 let where_clause = WhereClause::And(vec![
999 WhereClause::Field {
1000 path: vec!["active".to_string()],
1001 operator: WhereOperator::Eq,
1002 value: json!(true),
1003 },
1004 WhereClause::Or(vec![
1005 WhereClause::Field {
1006 path: vec!["role".to_string()],
1007 operator: WhereOperator::Eq,
1008 value: json!("admin"),
1009 },
1010 WhereClause::Field {
1011 path: vec!["age".to_string()],
1012 operator: WhereOperator::Gte,
1013 value: json!(30),
1014 },
1015 ]),
1016 ]);
1017
1018 let results = adapter
1019 .execute_where_query("v_user", Some(&where_clause), None, None)
1020 .await
1021 .expect("Failed to execute query");
1022
1023 for result in &results {
1024 assert_eq!(result.as_value()["active"], true);
1025 let role = result.as_value()["role"].as_str().unwrap();
1026 let age = result.as_value()["age"].as_i64().unwrap();
1027 assert!(role == "admin" || age >= 30);
1028 }
1029 }
1030
1031 #[tokio::test]
1036 async fn test_invalid_view_name() {
1037 let adapter = create_test_adapter().await;
1038
1039 let result = adapter.execute_where_query("v_nonexistent", None, None, None).await;
1040
1041 assert!(result.is_err());
1042 match result {
1043 Err(FraiseQLError::Database { .. }) => (),
1044 _ => panic!("Expected Database error"),
1045 }
1046 }
1047
1048 #[tokio::test]
1049 async fn test_invalid_connection_string() {
1050 let result =
1051 PostgresAdapter::new("postgresql://invalid:invalid@localhost:9999/nonexistent").await;
1052
1053 assert!(result.is_err());
1054 match result {
1055 Err(FraiseQLError::ConnectionPool { .. }) => (),
1056 _ => panic!("Expected ConnectionPool error"),
1057 }
1058 }
1059
1060 #[tokio::test]
1065 async fn test_parameterized_limit_only() {
1066 let adapter = create_test_adapter().await;
1067
1068 let results = adapter
1070 .execute_where_query("v_user", None, Some(3), None)
1071 .await
1072 .expect("Failed to execute query");
1073
1074 assert_eq!(results.len(), 3, "Should return exactly 3 results with parameterized LIMIT");
1075 }
1076
1077 #[tokio::test]
1078 async fn test_parameterized_offset_only() {
1079 let adapter = create_test_adapter().await;
1080
1081 let results_all = adapter
1082 .execute_where_query("v_user", None, None, None)
1083 .await
1084 .expect("Failed to execute query");
1085
1086 let offset_val = 1;
1087 let results_offset = adapter
1088 .execute_where_query("v_user", None, None, Some(offset_val))
1089 .await
1090 .expect("Failed to execute query");
1091
1092 assert_eq!(results_offset.len(), results_all.len() - offset_val as usize);
1093 }
1094
1095 #[tokio::test]
1096 async fn test_parameterized_limit_and_offset() {
1097 let adapter = create_test_adapter().await;
1098
1099 let limit_val = 2;
1101 let offset_val = 1;
1102 let results = adapter
1103 .execute_where_query("v_user", None, Some(limit_val), Some(offset_val))
1104 .await
1105 .expect("Failed to execute query");
1106
1107 assert_eq!(
1108 results.len(),
1109 limit_val as usize,
1110 "Should return exactly {} results",
1111 limit_val
1112 );
1113 }
1114
1115 #[tokio::test]
1116 async fn test_parameterized_limit_with_where_clause() {
1117 let adapter = create_test_adapter().await;
1118
1119 let where_clause = WhereClause::Field {
1120 path: vec!["active".to_string()],
1121 operator: WhereOperator::Eq,
1122 value: json!(true),
1123 };
1124
1125 let results = adapter
1127 .execute_where_query("v_user", Some(&where_clause), Some(2), None)
1128 .await
1129 .expect("Failed to execute query");
1130
1131 assert!(results.len() <= 2);
1132 for result in &results {
1133 assert_eq!(result.as_value()["active"], true);
1134 }
1135 }
1136
1137 #[tokio::test]
1138 async fn test_parameterized_limit_and_offset_with_where_clause() {
1139 let adapter = create_test_adapter().await;
1140
1141 let where_clause = WhereClause::Field {
1142 path: vec!["active".to_string()],
1143 operator: WhereOperator::Eq,
1144 value: json!(true),
1145 };
1146
1147 let results = adapter
1149 .execute_where_query("v_user", Some(&where_clause), Some(2), Some(1))
1150 .await
1151 .expect("Failed to execute query");
1152
1153 assert!(results.len() <= 2);
1154 for result in &results {
1155 assert_eq!(result.as_value()["active"], true);
1156 }
1157 }
1158}