Skip to main content

fraiseql_server/tenancy/
pool_factory.rs

1//! Tenant pool creation and executor construction.
2//!
3//! Provides [`TenantPoolConfig`] and [`create_tenant_executor`] to build a
4//! fully-formed `Executor<A>` from a compiled schema JSON string and database
5//! connection configuration. Used by the management API (Phase 03) to register
6//! tenants at runtime.
7
8use std::sync::Arc;
9
10use fraiseql_core::{db::traits::DatabaseAdapter, runtime::Executor, schema::CompiledSchema};
11use fraiseql_error::{FraiseQLError, Result};
12use serde::Deserialize;
13
14/// Connection configuration for a tenant database pool.
15#[derive(Debug, Clone, Deserialize)]
16pub struct TenantPoolConfig {
17    /// Database connection string (e.g. `postgres://user:pass@host:5432/db`).
18    pub connection_string:    String,
19    /// Maximum number of connections in the pool.
20    #[serde(default = "default_max_connections")]
21    pub max_connections:      u32,
22    /// Connection timeout in seconds.
23    #[serde(default = "default_connect_timeout")]
24    pub connect_timeout_secs: u64,
25    /// Idle connection timeout in seconds.
26    #[serde(default = "default_idle_timeout")]
27    pub idle_timeout_secs:    u64,
28}
29
30const fn default_max_connections() -> u32 {
31    10
32}
33const fn default_connect_timeout() -> u64 {
34    5
35}
36const fn default_idle_timeout() -> u64 {
37    300
38}
39
40/// Trait for database adapters that can be created from a connection string.
41///
42/// Implemented by adapters that support dynamic pool creation at runtime
43/// (as opposed to static initialization at server startup).
44#[async_trait::async_trait]
45pub trait FromPoolConfig: DatabaseAdapter + Sized {
46    /// Create a new adapter from connection configuration.
47    ///
48    /// # Errors
49    ///
50    /// Returns `FraiseQLError::ConnectionPool` or `FraiseQLError::Database`
51    /// if the connection cannot be established.
52    async fn from_pool_config(config: &TenantPoolConfig) -> Result<Self>;
53}
54
55/// Creates a complete tenant executor from a compiled schema JSON string and
56/// connection configuration.
57///
58/// This is the primary entry point for tenant registration: it parses the schema,
59/// validates its format version, creates a database pool, and assembles an
60/// `Executor<A>` with both baked in.
61///
62/// # Errors
63///
64/// Returns `FraiseQLError::Parse` if the schema JSON is invalid.
65/// Returns `FraiseQLError::Validation` if the schema format version is unsupported.
66/// Returns `FraiseQLError::ConnectionPool` / `FraiseQLError::Database` if the pool
67/// cannot be created.
68pub async fn create_tenant_executor<A: FromPoolConfig>(
69    schema_json: &str,
70    pool_config: &TenantPoolConfig,
71) -> Result<Arc<Executor<A>>> {
72    // 1. Parse and validate schema
73    let schema = CompiledSchema::from_json(schema_json).map_err(|e| FraiseQLError::Parse {
74        message:  format!("Invalid compiled schema JSON: {e}"),
75        location: String::new(),
76    })?;
77
78    schema
79        .validate_format_version()
80        .map_err(|msg| FraiseQLError::validation(format!("Incompatible compiled schema: {msg}")))?;
81
82    // 2. Create database adapter/pool
83    let adapter = A::from_pool_config(pool_config).await?;
84
85    // 3. Assemble executor
86    Ok(Arc::new(Executor::new(schema, Arc::new(adapter))))
87}
88
89#[cfg(test)]
90mod tests {
91    #![allow(clippy::unwrap_used)] // Reason: test code, panics acceptable
92    #![allow(clippy::missing_panics_doc)] // Reason: test helpers
93    #![allow(clippy::missing_errors_doc)] // Reason: test helpers
94    #![allow(missing_docs)] // Reason: test code
95
96    use async_trait::async_trait;
97    use fraiseql_core::{
98        db::{
99            WhereClause,
100            traits::DatabaseAdapter,
101            types::{DatabaseType, JsonbValue, PoolMetrics},
102        },
103        error::Result as FraiseQLResult,
104        schema::CompiledSchema,
105    };
106
107    use super::*;
108
109    /// Stub adapter that implements `FromPoolConfig` for testing.
110    #[derive(Debug, Clone)]
111    struct StubPoolAdapter;
112
113    #[async_trait]
114    impl DatabaseAdapter for StubPoolAdapter {
115        async fn execute_where_query(
116            &self,
117            _view: &str,
118            _where_clause: Option<&WhereClause>,
119            _limit: Option<u32>,
120            _offset: Option<u32>,
121            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
122
123            _session_vars: &[(&str, &str)],
124        ) -> FraiseQLResult<Vec<JsonbValue>> {
125            Ok(vec![])
126        }
127
128        async fn execute_with_projection(
129            &self,
130            _view: &str,
131            _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
132            _where_clause: Option<&WhereClause>,
133            _limit: Option<u32>,
134            _offset: Option<u32>,
135            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
136
137            _session_vars: &[(&str, &str)],
138        ) -> FraiseQLResult<Vec<JsonbValue>> {
139            Ok(vec![])
140        }
141
142        fn database_type(&self) -> DatabaseType {
143            DatabaseType::SQLite
144        }
145
146        async fn health_check(&self) -> FraiseQLResult<()> {
147            Ok(())
148        }
149
150        fn pool_metrics(&self) -> PoolMetrics {
151            PoolMetrics::default()
152        }
153
154        async fn execute_raw_query(
155            &self,
156            _sql: &str,
157        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
158            Ok(vec![])
159        }
160
161        async fn execute_parameterized_aggregate(
162            &self,
163            _sql: &str,
164            _params: &[serde_json::Value],        _session_vars: &[(&str, &str)],
165
166        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
167            Ok(vec![])
168        }
169    }
170
171    #[async_trait]
172    impl FromPoolConfig for StubPoolAdapter {
173        async fn from_pool_config(_config: &TenantPoolConfig) -> FraiseQLResult<Self> {
174            Ok(Self)
175        }
176    }
177
178    fn test_pool_config() -> TenantPoolConfig {
179        TenantPoolConfig {
180            connection_string:    "stub://localhost/test".to_string(),
181            max_connections:      5,
182            connect_timeout_secs: 5,
183            idle_timeout_secs:    300,
184        }
185    }
186
187    #[tokio::test]
188    async fn test_create_tenant_executor_success() {
189        let schema = CompiledSchema::default();
190        let schema_json = serde_json::to_string(&schema).unwrap();
191        let config = test_pool_config();
192
193        let executor =
194            create_tenant_executor::<StubPoolAdapter>(&schema_json, &config).await.unwrap();
195        assert_eq!(executor.schema().types.len(), 0);
196    }
197
198    #[tokio::test]
199    async fn test_create_tenant_executor_invalid_json() {
200        let config = test_pool_config();
201        let Err(err) = create_tenant_executor::<StubPoolAdapter>("not valid json", &config).await
202        else {
203            panic!("expected Err for invalid JSON");
204        };
205        assert!(matches!(err, FraiseQLError::Parse { .. }), "Expected Parse error, got: {err:?}");
206    }
207
208    #[tokio::test]
209    async fn test_create_tenant_executor_bad_format_version() {
210        let schema = CompiledSchema {
211            schema_format_version: Some(999),
212            ..CompiledSchema::default()
213        };
214        let schema_json = serde_json::to_string(&schema).unwrap();
215        let config = test_pool_config();
216
217        let Err(err) = create_tenant_executor::<StubPoolAdapter>(&schema_json, &config).await
218        else {
219            panic!("expected Err for bad format version");
220        };
221        assert!(
222            matches!(err, FraiseQLError::Validation { .. }),
223            "Expected Validation error, got: {err:?}"
224        );
225    }
226
227    /// Adapter that always fails to connect — simulates unreachable DB.
228    #[derive(Debug, Clone)]
229    struct FailingAdapter;
230
231    #[async_trait]
232    impl DatabaseAdapter for FailingAdapter {
233        async fn execute_where_query(
234            &self,
235            _view: &str,
236            _where_clause: Option<&WhereClause>,
237            _limit: Option<u32>,
238            _offset: Option<u32>,
239            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
240
241            _session_vars: &[(&str, &str)],
242        ) -> FraiseQLResult<Vec<JsonbValue>> {
243            Ok(vec![])
244        }
245
246        async fn execute_with_projection(
247            &self,
248            _view: &str,
249            _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
250            _where_clause: Option<&WhereClause>,
251            _limit: Option<u32>,
252            _offset: Option<u32>,
253            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
254
255            _session_vars: &[(&str, &str)],
256        ) -> FraiseQLResult<Vec<JsonbValue>> {
257            Ok(vec![])
258        }
259
260        fn database_type(&self) -> DatabaseType {
261            DatabaseType::PostgreSQL
262        }
263
264        async fn health_check(&self) -> FraiseQLResult<()> {
265            Err(FraiseQLError::database("connection refused"))
266        }
267
268        fn pool_metrics(&self) -> PoolMetrics {
269            PoolMetrics::default()
270        }
271
272        async fn execute_raw_query(
273            &self,
274            _sql: &str,
275        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
276            Ok(vec![])
277        }
278
279        async fn execute_parameterized_aggregate(
280            &self,
281            _sql: &str,
282            _params: &[serde_json::Value],        _session_vars: &[(&str, &str)],
283
284        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
285            Ok(vec![])
286        }
287    }
288
289    #[async_trait]
290    impl FromPoolConfig for FailingAdapter {
291        async fn from_pool_config(_config: &TenantPoolConfig) -> FraiseQLResult<Self> {
292            Err(FraiseQLError::ConnectionPool {
293                message: "connection refused".to_string(),
294            })
295        }
296    }
297
298    #[tokio::test]
299    async fn test_create_tenant_executor_unreachable_db() {
300        let schema = CompiledSchema::default();
301        let schema_json = serde_json::to_string(&schema).unwrap();
302        let config = test_pool_config();
303
304        let Err(err) = create_tenant_executor::<FailingAdapter>(&schema_json, &config).await else {
305            panic!("expected Err for unreachable DB");
306        };
307        assert!(
308            matches!(err, FraiseQLError::ConnectionPool { .. }),
309            "Expected ConnectionPool error, got: {err:?}"
310        );
311    }
312
313    #[test]
314    fn test_pool_config_defaults() {
315        let json = r#"{"connection_string": "postgres://localhost/test"}"#;
316        let config: TenantPoolConfig = serde_json::from_str(json).unwrap();
317        assert_eq!(config.max_connections, 10);
318        assert_eq!(config.connect_timeout_secs, 5);
319        assert_eq!(config.idle_timeout_secs, 300);
320    }
321}