Skip to main content

fraiseql_server/tenancy/
pool_factory.rs

1//! Tenant pool creation and executor construction.
2//!
3//! Provides [`TenantPoolConfig`] and [`create_tenant_executor`] to build a
4//! fully-formed `Executor<A>` from a compiled schema JSON string and database
5//! connection configuration. Used by the management API (Phase 03) to register
6//! tenants at runtime.
7
8use std::sync::Arc;
9
10use fraiseql_core::{db::traits::DatabaseAdapter, runtime::Executor, schema::CompiledSchema};
11use fraiseql_error::{FraiseQLError, Result};
12use serde::Deserialize;
13
14/// Connection configuration for a tenant database pool.
15#[derive(Debug, Clone, Deserialize)]
16pub struct TenantPoolConfig {
17    /// Database connection string (e.g. `postgres://user:pass@host:5432/db`).
18    pub connection_string:    String,
19    /// Maximum number of connections in the pool.
20    #[serde(default = "default_max_connections")]
21    pub max_connections:      u32,
22    /// Connection timeout in seconds.
23    #[serde(default = "default_connect_timeout")]
24    pub connect_timeout_secs: u64,
25    /// Idle connection timeout in seconds.
26    #[serde(default = "default_idle_timeout")]
27    pub idle_timeout_secs:    u64,
28}
29
30const fn default_max_connections() -> u32 {
31    10
32}
33const fn default_connect_timeout() -> u64 {
34    5
35}
36const fn default_idle_timeout() -> u64 {
37    300
38}
39
40/// Trait for database adapters that can be created from a connection string.
41///
42/// Implemented by adapters that support dynamic pool creation at runtime
43/// (as opposed to static initialization at server startup).
44#[async_trait::async_trait]
45pub trait FromPoolConfig: DatabaseAdapter + Sized {
46    /// Create a new adapter from connection configuration.
47    ///
48    /// # Errors
49    ///
50    /// Returns `FraiseQLError::ConnectionPool` or `FraiseQLError::Database`
51    /// if the connection cannot be established.
52    async fn from_pool_config(config: &TenantPoolConfig) -> Result<Self>;
53}
54
55/// Creates a complete tenant executor from a compiled schema JSON string and
56/// connection configuration.
57///
58/// This is the primary entry point for tenant registration: it parses the schema,
59/// validates its format version, creates a database pool, and assembles an
60/// `Executor<A>` with both baked in.
61///
62/// # Errors
63///
64/// Returns `FraiseQLError::Parse` if the schema JSON is invalid.
65/// Returns `FraiseQLError::Validation` if the schema format version is unsupported.
66/// Returns `FraiseQLError::ConnectionPool` / `FraiseQLError::Database` if the pool
67/// cannot be created.
68pub async fn create_tenant_executor<A: FromPoolConfig>(
69    schema_json: &str,
70    pool_config: &TenantPoolConfig,
71) -> Result<Arc<Executor<A>>> {
72    // 1. Parse and validate schema
73    let schema = CompiledSchema::from_json(schema_json).map_err(|e| FraiseQLError::Parse {
74        message:  format!("Invalid compiled schema JSON: {e}"),
75        location: String::new(),
76    })?;
77
78    schema
79        .validate_format_version()
80        .map_err(|msg| FraiseQLError::validation(format!("Incompatible compiled schema: {msg}")))?;
81
82    // 2. Create database adapter/pool
83    let adapter = A::from_pool_config(pool_config).await?;
84
85    // 3. Assemble executor
86    Ok(Arc::new(Executor::new(schema, Arc::new(adapter))))
87}
88
89#[cfg(test)]
90mod tests {
91    #![allow(clippy::unwrap_used)] // Reason: test code, panics acceptable
92    #![allow(clippy::missing_panics_doc)] // Reason: test helpers
93    #![allow(clippy::missing_errors_doc)] // Reason: test helpers
94    #![allow(missing_docs)] // Reason: test code
95
96    use async_trait::async_trait;
97    use fraiseql_core::{
98        db::{
99            WhereClause,
100            traits::DatabaseAdapter,
101            types::{DatabaseType, JsonbValue, PoolMetrics},
102        },
103        error::Result as FraiseQLResult,
104        schema::CompiledSchema,
105    };
106
107    use super::*;
108
109    /// Stub adapter that implements `FromPoolConfig` for testing.
110    #[derive(Debug, Clone)]
111    struct StubPoolAdapter;
112
113    #[async_trait]
114    impl DatabaseAdapter for StubPoolAdapter {
115        async fn execute_where_query(
116            &self,
117            _view: &str,
118            _where_clause: Option<&WhereClause>,
119            _limit: Option<u32>,
120            _offset: Option<u32>,
121            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
122        ) -> FraiseQLResult<Vec<JsonbValue>> {
123            Ok(vec![])
124        }
125
126        async fn execute_with_projection(
127            &self,
128            _view: &str,
129            _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
130            _where_clause: Option<&WhereClause>,
131            _limit: Option<u32>,
132            _offset: Option<u32>,
133            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
134        ) -> FraiseQLResult<Vec<JsonbValue>> {
135            Ok(vec![])
136        }
137
138        fn database_type(&self) -> DatabaseType {
139            DatabaseType::SQLite
140        }
141
142        async fn health_check(&self) -> FraiseQLResult<()> {
143            Ok(())
144        }
145
146        fn pool_metrics(&self) -> PoolMetrics {
147            PoolMetrics::default()
148        }
149
150        async fn execute_raw_query(
151            &self,
152            _sql: &str,
153        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
154            Ok(vec![])
155        }
156
157        async fn execute_parameterized_aggregate(
158            &self,
159            _sql: &str,
160            _params: &[serde_json::Value],
161        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
162            Ok(vec![])
163        }
164    }
165
166    #[async_trait]
167    impl FromPoolConfig for StubPoolAdapter {
168        async fn from_pool_config(_config: &TenantPoolConfig) -> FraiseQLResult<Self> {
169            Ok(Self)
170        }
171    }
172
173    fn test_pool_config() -> TenantPoolConfig {
174        TenantPoolConfig {
175            connection_string:    "stub://localhost/test".to_string(),
176            max_connections:      5,
177            connect_timeout_secs: 5,
178            idle_timeout_secs:    300,
179        }
180    }
181
182    #[tokio::test]
183    async fn test_create_tenant_executor_success() {
184        let schema = CompiledSchema::default();
185        let schema_json = serde_json::to_string(&schema).unwrap();
186        let config = test_pool_config();
187
188        let executor =
189            create_tenant_executor::<StubPoolAdapter>(&schema_json, &config).await.unwrap();
190        assert_eq!(executor.schema().types.len(), 0);
191    }
192
193    #[tokio::test]
194    async fn test_create_tenant_executor_invalid_json() {
195        let config = test_pool_config();
196        let Err(err) = create_tenant_executor::<StubPoolAdapter>("not valid json", &config).await
197        else {
198            panic!("expected Err for invalid JSON");
199        };
200        assert!(matches!(err, FraiseQLError::Parse { .. }), "Expected Parse error, got: {err:?}");
201    }
202
203    #[tokio::test]
204    async fn test_create_tenant_executor_bad_format_version() {
205        let schema = CompiledSchema {
206            schema_format_version: Some(999),
207            ..CompiledSchema::default()
208        };
209        let schema_json = serde_json::to_string(&schema).unwrap();
210        let config = test_pool_config();
211
212        let Err(err) = create_tenant_executor::<StubPoolAdapter>(&schema_json, &config).await
213        else {
214            panic!("expected Err for bad format version");
215        };
216        assert!(
217            matches!(err, FraiseQLError::Validation { .. }),
218            "Expected Validation error, got: {err:?}"
219        );
220    }
221
222    /// Adapter that always fails to connect — simulates unreachable DB.
223    #[derive(Debug, Clone)]
224    struct FailingAdapter;
225
226    #[async_trait]
227    impl DatabaseAdapter for FailingAdapter {
228        async fn execute_where_query(
229            &self,
230            _view: &str,
231            _where_clause: Option<&WhereClause>,
232            _limit: Option<u32>,
233            _offset: Option<u32>,
234            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
235        ) -> FraiseQLResult<Vec<JsonbValue>> {
236            Ok(vec![])
237        }
238
239        async fn execute_with_projection(
240            &self,
241            _view: &str,
242            _projection: Option<&fraiseql_core::schema::SqlProjectionHint>,
243            _where_clause: Option<&WhereClause>,
244            _limit: Option<u32>,
245            _offset: Option<u32>,
246            _order_by: Option<&[fraiseql_core::db::types::OrderByClause]>,
247        ) -> FraiseQLResult<Vec<JsonbValue>> {
248            Ok(vec![])
249        }
250
251        fn database_type(&self) -> DatabaseType {
252            DatabaseType::PostgreSQL
253        }
254
255        async fn health_check(&self) -> FraiseQLResult<()> {
256            Err(FraiseQLError::database("connection refused"))
257        }
258
259        fn pool_metrics(&self) -> PoolMetrics {
260            PoolMetrics::default()
261        }
262
263        async fn execute_raw_query(
264            &self,
265            _sql: &str,
266        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
267            Ok(vec![])
268        }
269
270        async fn execute_parameterized_aggregate(
271            &self,
272            _sql: &str,
273            _params: &[serde_json::Value],
274        ) -> FraiseQLResult<Vec<std::collections::HashMap<String, serde_json::Value>>> {
275            Ok(vec![])
276        }
277    }
278
279    #[async_trait]
280    impl FromPoolConfig for FailingAdapter {
281        async fn from_pool_config(_config: &TenantPoolConfig) -> FraiseQLResult<Self> {
282            Err(FraiseQLError::ConnectionPool {
283                message: "connection refused".to_string(),
284            })
285        }
286    }
287
288    #[tokio::test]
289    async fn test_create_tenant_executor_unreachable_db() {
290        let schema = CompiledSchema::default();
291        let schema_json = serde_json::to_string(&schema).unwrap();
292        let config = test_pool_config();
293
294        let Err(err) = create_tenant_executor::<FailingAdapter>(&schema_json, &config).await else {
295            panic!("expected Err for unreachable DB");
296        };
297        assert!(
298            matches!(err, FraiseQLError::ConnectionPool { .. }),
299            "Expected ConnectionPool error, got: {err:?}"
300        );
301    }
302
303    #[test]
304    fn test_pool_config_defaults() {
305        let json = r#"{"connection_string": "postgres://localhost/test"}"#;
306        let config: TenantPoolConfig = serde_json::from_str(json).unwrap();
307        assert_eq!(config.max_connections, 10);
308        assert_eq!(config.connect_timeout_secs, 5);
309        assert_eq!(config.idle_timeout_secs, 300);
310    }
311}