1use std::sync::Arc;
9
10use fraiseql_core::{db::traits::DatabaseAdapter, runtime::Executor, schema::CompiledSchema};
11use fraiseql_error::{FraiseQLError, Result};
12use serde::Deserialize;
13
14#[derive(Debug, Clone, Deserialize)]
16pub struct TenantPoolConfig {
17 pub connection_string: String,
19 #[serde(default = "default_max_connections")]
21 pub max_connections: u32,
22 #[serde(default = "default_connect_timeout")]
24 pub connect_timeout_secs: u64,
25 #[serde(default = "default_idle_timeout")]
27 pub idle_timeout_secs: u64,
28}
29
30const fn default_max_connections() -> u32 {
31 10
32}
33const fn default_connect_timeout() -> u64 {
34 5
35}
36const fn default_idle_timeout() -> u64 {
37 300
38}
39
40#[async_trait::async_trait]
45pub trait FromPoolConfig: DatabaseAdapter + Sized {
46 async fn from_pool_config(config: &TenantPoolConfig) -> Result<Self>;
53}
54
55pub async fn create_tenant_executor<A: FromPoolConfig>(
69 schema_json: &str,
70 pool_config: &TenantPoolConfig,
71) -> Result<Arc<Executor<A>>> {
72 let schema = CompiledSchema::from_json(schema_json).map_err(|e| FraiseQLError::Parse {
74 message: format!("Invalid compiled schema JSON: {e}"),
75 location: String::new(),
76 })?;
77
78 schema
79 .validate_format_version()
80 .map_err(|msg| FraiseQLError::validation(format!("Incompatible compiled schema: {msg}")))?;
81
82 let adapter = A::from_pool_config(pool_config).await?;
84
85 Ok(Arc::new(Executor::new(schema, Arc::new(adapter))))
87}
88
89#[cfg(test)]
90mod tests {
91 #![allow(clippy::unwrap_used)] #![allow(clippy::missing_panics_doc)] #![allow(clippy::missing_errors_doc)] #![allow(missing_docs)] use async_trait::async_trait;
97 use fraiseql_core::{
98 db::{
99 WhereClause,
100 traits::DatabaseAdapter,
101 types::{DatabaseType, JsonbValue, PoolMetrics},
102 },
103 error::Result as FraiseQLResult,
104 schema::CompiledSchema,
105 };
106
107 use super::*;
108
109 #[derive(Debug, Clone)]
111 struct StubPoolAdapter;
112
113 #[async_trait]
114 impl DatabaseAdapter for StubPoolAdapter {
115 async fn execute_where_query(
116 &self,
117 _view: &str,
118 _where_clause: Option<&WhereClause>,
119 _limit: Option<u32>,
120 _offset: Option<u32>,
121 _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
122
123 _session_vars: &[(&str, &str)],
124 ) -> FraiseQLResult<Vec<JsonbValue>> {
125 Ok(vec![])
126 }
127
128 async fn execute_with_projection(
129 &self,
130 _view: &str,
131 _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
132 _where_clause: Option<&WhereClause>,
133 _limit: Option<u32>,
134 _offset: Option<u32>,
135 _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
136
137 _session_vars: &[(&str, &str)],
138 ) -> FraiseQLResult<Vec<JsonbValue>> {
139 Ok(vec![])
140 }
141
142 fn database_type(&self) -> DatabaseType {
143 DatabaseType::SQLite
144 }
145
146 async fn health_check(&self) -> FraiseQLResult<()> {
147 Ok(())
148 }
149
150 fn pool_metrics(&self) -> PoolMetrics {
151 PoolMetrics::default()
152 }
153
154 async fn execute_raw_query(
155 &self,
156 _sql: &str,
157 ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
158 Ok(vec![])
159 }
160
161 async fn execute_parameterized_aggregate(
162 &self,
163 _sql: &str,
164 _params: &[serde_json::Value], _session_vars: &[(&str, &str)],
165
166 ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
167 Ok(vec![])
168 }
169 }
170
171 #[async_trait]
172 impl FromPoolConfig for StubPoolAdapter {
173 async fn from_pool_config(_config: &TenantPoolConfig) -> FraiseQLResult<Self> {
174 Ok(Self)
175 }
176 }
177
178 fn test_pool_config() -> TenantPoolConfig {
179 TenantPoolConfig {
180 connection_string: "stub://localhost/test".to_string(),
181 max_connections: 5,
182 connect_timeout_secs: 5,
183 idle_timeout_secs: 300,
184 }
185 }
186
187 #[tokio::test]
188 async fn test_create_tenant_executor_success() {
189 let schema = CompiledSchema::default();
190 let schema_json = serde_json::to_string(&schema).unwrap();
191 let config = test_pool_config();
192
193 let executor =
194 create_tenant_executor::<StubPoolAdapter>(&schema_json, &config).await.unwrap();
195 assert_eq!(executor.schema().types.len(), 0);
196 }
197
198 #[tokio::test]
199 async fn test_create_tenant_executor_invalid_json() {
200 let config = test_pool_config();
201 let Err(err) = create_tenant_executor::<StubPoolAdapter>("not valid json", &config).await
202 else {
203 panic!("expected Err for invalid JSON");
204 };
205 assert!(matches!(err, FraiseQLError::Parse { .. }), "Expected Parse error, got: {err:?}");
206 }
207
208 #[tokio::test]
209 async fn test_create_tenant_executor_bad_format_version() {
210 let schema = CompiledSchema {
211 schema_format_version: Some(999),
212 ..CompiledSchema::default()
213 };
214 let schema_json = serde_json::to_string(&schema).unwrap();
215 let config = test_pool_config();
216
217 let Err(err) = create_tenant_executor::<StubPoolAdapter>(&schema_json, &config).await
218 else {
219 panic!("expected Err for bad format version");
220 };
221 assert!(
222 matches!(err, FraiseQLError::Validation { .. }),
223 "Expected Validation error, got: {err:?}"
224 );
225 }
226
227 #[derive(Debug, Clone)]
229 struct FailingAdapter;
230
231 #[async_trait]
232 impl DatabaseAdapter for FailingAdapter {
233 async fn execute_where_query(
234 &self,
235 _view: &str,
236 _where_clause: Option<&WhereClause>,
237 _limit: Option<u32>,
238 _offset: Option<u32>,
239 _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
240
241 _session_vars: &[(&str, &str)],
242 ) -> FraiseQLResult<Vec<JsonbValue>> {
243 Ok(vec![])
244 }
245
246 async fn execute_with_projection(
247 &self,
248 _view: &str,
249 _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
250 _where_clause: Option<&WhereClause>,
251 _limit: Option<u32>,
252 _offset: Option<u32>,
253 _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
254
255 _session_vars: &[(&str, &str)],
256 ) -> FraiseQLResult<Vec<JsonbValue>> {
257 Ok(vec![])
258 }
259
260 fn database_type(&self) -> DatabaseType {
261 DatabaseType::PostgreSQL
262 }
263
264 async fn health_check(&self) -> FraiseQLResult<()> {
265 Err(FraiseQLError::database("connection refused"))
266 }
267
268 fn pool_metrics(&self) -> PoolMetrics {
269 PoolMetrics::default()
270 }
271
272 async fn execute_raw_query(
273 &self,
274 _sql: &str,
275 ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
276 Ok(vec![])
277 }
278
279 async fn execute_parameterized_aggregate(
280 &self,
281 _sql: &str,
282 _params: &[serde_json::Value], _session_vars: &[(&str, &str)],
283
284 ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
285 Ok(vec![])
286 }
287 }
288
289 #[async_trait]
290 impl FromPoolConfig for FailingAdapter {
291 async fn from_pool_config(_config: &TenantPoolConfig) -> FraiseQLResult<Self> {
292 Err(FraiseQLError::ConnectionPool {
293 message: "connection refused".to_string(),
294 })
295 }
296 }
297
298 #[tokio::test]
299 async fn test_create_tenant_executor_unreachable_db() {
300 let schema = CompiledSchema::default();
301 let schema_json = serde_json::to_string(&schema).unwrap();
302 let config = test_pool_config();
303
304 let Err(err) = create_tenant_executor::<FailingAdapter>(&schema_json, &config).await else {
305 panic!("expected Err for unreachable DB");
306 };
307 assert!(
308 matches!(err, FraiseQLError::ConnectionPool { .. }),
309 "Expected ConnectionPool error, got: {err:?}"
310 );
311 }
312
313 #[test]
314 fn test_pool_config_defaults() {
315 let json = r#"{"connection_string": "postgres://localhost/test"}"#;
316 let config: TenantPoolConfig = serde_json::from_str(json).unwrap();
317 assert_eq!(config.max_connections, 10);
318 assert_eq!(config.connect_timeout_secs, 5);
319 assert_eq!(config.idle_timeout_secs, 300);
320 }
321}