Skip to main content

fraiseql_server/server/
builder.rs

1//! Server constructors and builder methods.
2
3use std::sync::Arc;
4
5#[cfg(feature = "arrow")]
6use fraiseql_arrow::FraiseQLFlightService;
7use fraiseql_core::{
8    db::traits::DatabaseAdapter,
9    runtime::{Executor, SubscriptionManager},
10    schema::CompiledSchema,
11    security::OidcValidator,
12};
13use tracing::{info, warn};
14
15use super::{RateLimiter, Result, Server, ServerConfig, ServerError};
16
17impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
18    /// Create new server.
19    ///
20    /// Relay pagination queries will return a `Validation` error at runtime. Use
21    /// [`Server::with_relay_pagination`] when the adapter implements
22    /// [`RelayDatabaseAdapter`](fraiseql_core::db::traits::RelayDatabaseAdapter)
23    /// and relay support is required.
24    ///
25    /// # Arguments
26    ///
27    /// * `config` - Server configuration
28    /// * `schema` - Compiled GraphQL schema
29    /// * `adapter` - Database adapter
30    /// * `db_pool` — forwarded to the observer runtime; `None` when observers are disabled.
31    ///
32    /// # Errors
33    ///
34    /// Returns error if OIDC validator initialization fails (e.g., unable to
35    /// fetch discovery document or JWKS).
36    ///
37    /// # Example
38    ///
39    /// ```text
40    /// // Requires: running PostgreSQL database and compiled schema file.
41    /// let config = ServerConfig::default();
42    /// let schema = CompiledSchema::from_json(schema_json)?;
43    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
44    ///
45    /// let server = Server::new(config, schema, adapter, None).await?;
46    /// server.serve().await?;
47    /// ```
48    #[allow(clippy::cognitive_complexity)] // Reason: server construction with subsystem initialization (auth, rate-limit, observers, etc.)
49    pub async fn new(
50        config: ServerConfig,
51        schema: CompiledSchema,
52        adapter: Arc<A>,
53        db_pool: Option<sqlx::PgPool>,
54    ) -> Result<Self> {
55        // Validate compiled schema format version before any further setup.
56        // Warns for legacy schemas (no version field); rejects incompatible future versions.
57        if schema.schema_format_version.is_none() {
58            warn!(
59                "Loaded schema has no schema_format_version (pre-v2.1 format). \
60                 Re-compile with the current fraiseql-cli for version compatibility checking."
61            );
62        }
63        schema.validate_format_version().map_err(|msg| {
64            ServerError::ConfigError(format!("Incompatible compiled schema: {msg}"))
65        })?;
66
67        // Read security configs from compiled schema BEFORE schema is moved.
68        #[cfg(feature = "federation")]
69        let circuit_breaker = schema.federation.as_ref().and_then(
70            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
71        );
72        #[cfg(not(feature = "federation"))]
73        let circuit_breaker: Option<()> = None;
74        #[cfg(not(feature = "federation"))]
75        let _ = &schema.federation;
76        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
77        #[cfg(feature = "auth")]
78        let state_encryption = Self::state_encryption_from_schema(&schema)?;
79        #[cfg(not(feature = "auth"))]
80        let state_encryption: Option<
81            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
82        > = None;
83        #[cfg(feature = "auth")]
84        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
85        #[cfg(not(feature = "auth"))]
86        let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
87        #[cfg(feature = "auth")]
88        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
89        #[cfg(not(feature = "auth"))]
90        let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
91        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
92        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
93        if api_key_authenticator.is_some() {
94            info!("API key authentication enabled");
95        }
96        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
97        if revocation_manager.is_some() {
98            info!("Token revocation enabled");
99        }
100        let trusted_docs = Self::trusted_docs_from_schema(&schema);
101
102        // Validate cache + RLS safety at startup.
103        // Cache isolation relies entirely on per-user WHERE clauses in the cache key.
104        // Without RLS, users with the same query and variables share the same cached
105        // response, which can leak data across tenants.
106        if config.cache_enabled && !schema.has_rls_configured() {
107            if schema.is_multi_tenant() {
108                // Multi-tenant + cache + no RLS is a hard safety violation.
109                return Err(ServerError::ConfigError(
110                    "Cache is enabled in a multi-tenant schema but no Row-Level Security \
111                     policies are declared. This would allow cross-tenant cache hits and \
112                     data leakage. In fraiseql.toml, either disable caching with \
113                     [cache] enabled = false, declare [security.rls] policies, or set \
114                     [security] multi_tenant = false to acknowledge single-tenant mode."
115                        .to_string(),
116                ));
117            }
118            // Single-tenant with cache and no RLS: safe, but warn in case of misconfiguration.
119            warn!(
120                "Query-result caching is enabled but no Row-Level Security policies are \
121                 declared in the compiled schema. This is safe for single-tenant deployments. \
122                 For multi-tenant deployments, declare RLS policies and set \
123                 `security.multi_tenant = true` in your schema."
124            );
125        }
126
127        // Read subscription config from compiled schema (hooks, limits).
128        let subscriptions_config = schema.subscriptions_config.clone();
129
130        let executor = Arc::new(Executor::new(schema.clone(), adapter));
131        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
132
133        let mut server = Self::from_executor(
134            config,
135            executor,
136            subscription_manager,
137            circuit_breaker,
138            error_sanitizer,
139            state_encryption,
140            pkce_store,
141            oidc_server_client,
142            schema_rate_limiter,
143            api_key_authenticator,
144            revocation_manager,
145            trusted_docs,
146            db_pool,
147        )
148        .await?;
149
150        // Apply pool tuning config from ServerConfig (if present).
151        if let Some(pt) = server.config.pool_tuning.clone() {
152            if pt.enabled {
153                server = server
154                    .with_pool_tuning(pt)
155                    .map_err(|e| ServerError::ConfigError(format!("pool_tuning: {e}")))?;
156            }
157        }
158
159        // Initialize MCP config from compiled schema when the feature is compiled in.
160        #[cfg(feature = "mcp")]
161        if let Some(ref cfg) = server.executor.schema().mcp_config {
162            if cfg.enabled {
163                let tool_count =
164                    crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
165                info!(
166                    path = %cfg.path,
167                    transport = %cfg.transport,
168                    tools = tool_count,
169                    "MCP server configured"
170                );
171                server.mcp_config = Some(cfg.clone());
172            }
173        }
174
175        // Initialize APQ store when enabled.
176        if server.config.apq_enabled {
177            let apq_store: fraiseql_core::apq::ArcApqStorage =
178                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
179            server.apq_store = Some(apq_store);
180            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
181        }
182
183        // Apply subscription lifecycle/limits from compiled schema.
184        if let Some(ref subs) = subscriptions_config {
185            if let Some(max) = subs.max_subscriptions_per_connection {
186                server.max_subscriptions_per_connection = Some(max);
187            }
188            if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_config(subs) {
189                server.subscription_lifecycle = Arc::new(lifecycle);
190            }
191        }
192
193        Ok(server)
194    }
195
196    /// Shared initialization path used by both `new` and `with_relay_pagination`.
197    ///
198    /// Accepts a pre-built executor so that relay vs. non-relay constructors can supply
199    /// the appropriate variant without duplicating auth/rate-limiter/observer setup.
200    #[allow(clippy::too_many_arguments)]
201    // Reason: internal constructor collects all pre-built subsystems; a builder struct would not
202    // reduce call-site clarity
203    #[allow(clippy::cognitive_complexity)] // Reason: internal constructor that assembles server from pre-built subsystems
204    pub(super) async fn from_executor(
205        config: ServerConfig,
206        executor: Arc<Executor<A>>,
207        subscription_manager: Arc<SubscriptionManager>,
208        #[cfg(feature = "federation")] circuit_breaker: Option<
209            Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
210        >,
211        #[cfg(not(feature = "federation"))] _circuit_breaker: Option<()>,
212        error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
213        state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
214        pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
215        oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
216        schema_rate_limiter: Option<Arc<RateLimiter>>,
217        api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
218        revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
219        trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
220        // `db_pool` is forwarded to the observer runtime; unused when the `observers` feature is
221        // off.
222        #[cfg_attr(not(feature = "observers"), allow(unused_variables))] db_pool: Option<
223            sqlx::PgPool,
224        >,
225    ) -> Result<Self> {
226        // Initialize OIDC validator if auth is configured
227        let oidc_validator = if let Some(ref auth_config) = config.auth {
228            info!(
229                issuer = %auth_config.issuer,
230                "Initializing OIDC authentication"
231            );
232            let validator = OidcValidator::new(auth_config.clone())
233                .await
234                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
235            Some(Arc::new(validator))
236        } else {
237            None
238        };
239
240        // Initialize rate limiter: compiled schema config takes priority over server config.
241        let rate_limiter = if let Some(rl) = schema_rate_limiter {
242            Some(rl)
243        } else if let Some(ref rate_config) = config.rate_limiting {
244            if rate_config.enabled {
245                info!(
246                    rps_per_ip = rate_config.rps_per_ip,
247                    rps_per_user = rate_config.rps_per_user,
248                    "Initializing rate limiting from server config"
249                );
250                Some(Arc::new(RateLimiter::new(rate_config.clone())))
251            } else {
252                info!("Rate limiting disabled by configuration");
253                None
254            }
255        } else {
256            None
257        };
258
259        // Initialize observer runtime
260        #[cfg(feature = "observers")]
261        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
262
263        // Initialize Flight service with OIDC authentication if configured
264        #[cfg(feature = "arrow")]
265        let flight_service = {
266            let mut service = FraiseQLFlightService::new();
267            if let Some(ref validator) = oidc_validator {
268                info!("Enabling OIDC authentication for Arrow Flight");
269                service.set_oidc_validator(validator.clone());
270            } else {
271                info!("Arrow Flight initialized without authentication (dev mode)");
272            }
273            Some(service)
274        };
275
276        // Warn if PKCE is configured but [auth] is missing (no OidcServerClient).
277        #[cfg(feature = "auth")]
278        if pkce_store.is_some() && oidc_server_client.is_none() {
279            tracing::error!(
280                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
281                 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
282                 Add [auth] with discovery_url, client_id, client_secret_env, and \
283                 server_redirect_uri to fraiseql.toml and recompile the schema."
284            );
285        }
286
287        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
288        #[cfg(feature = "auth")]
289        Self::check_redis_requirement(pkce_store.as_ref())?;
290
291        // Spawn background PKCE state cleanup task (every 5 minutes).
292        #[cfg(feature = "auth")]
293        if let Some(ref store) = pkce_store {
294            use std::time::Duration;
295
296            use tokio::time::MissedTickBehavior;
297            let store_clone = Arc::clone(store);
298            tokio::spawn(async move {
299                let mut ticker = tokio::time::interval(Duration::from_secs(300));
300                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
301                loop {
302                    ticker.tick().await;
303                    store_clone.cleanup_expired().await;
304                }
305            });
306        }
307
308        // Reason: state_encryption/pkce_store/oidc_server_client are only stored when
309        //         feature = "auth" is enabled; without it they are legitimately unused.
310        #[cfg(not(feature = "auth"))]
311        let _ = (state_encryption, pkce_store, oidc_server_client);
312        Ok(Self {
313            config,
314            executor,
315            subscription_manager,
316            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
317            max_subscriptions_per_connection: None,
318            oidc_validator,
319            rate_limiter,
320            #[cfg(feature = "secrets")]
321            secrets_manager: None,
322            #[cfg(feature = "federation")]
323            circuit_breaker,
324            error_sanitizer,
325            #[cfg(feature = "auth")]
326            state_encryption,
327            #[cfg(feature = "auth")]
328            pkce_store,
329            #[cfg(feature = "auth")]
330            oidc_server_client,
331            api_key_authenticator,
332            revocation_manager,
333            apq_store: None,
334            trusted_docs,
335            #[cfg(feature = "observers")]
336            observer_runtime,
337            #[cfg(feature = "observers")]
338            db_pool,
339            #[cfg(feature = "arrow")]
340            flight_service,
341            #[cfg(feature = "mcp")]
342            mcp_config: None,
343            pool_tuning_config: None,
344        })
345    }
346
347    /// Set lifecycle hooks for `WebSocket` subscriptions.
348    #[must_use]
349    pub fn with_subscription_lifecycle(
350        mut self,
351        lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
352    ) -> Self {
353        self.subscription_lifecycle = lifecycle;
354        self
355    }
356
357    /// Set maximum subscriptions allowed per `WebSocket` connection.
358    #[must_use]
359    pub const fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
360        self.max_subscriptions_per_connection = Some(max);
361        self
362    }
363
364    /// Enable adaptive connection pool sizing.
365    ///
366    /// When `config.enabled` is `true`, the server will spawn a background
367    /// polling task that samples pool metrics and recommends or applies resizes.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error string if the configuration fails validation (e.g. `min >= max`).
372    pub fn with_pool_tuning(
373        mut self,
374        config: crate::config::pool_tuning::PoolPressureMonitorConfig,
375    ) -> std::result::Result<Self, String> {
376        config.validate()?;
377        self.pool_tuning_config = Some(config);
378        Ok(self)
379    }
380
381    /// Set secrets manager for the server.
382    ///
383    /// This allows attaching a secrets manager after server creation for credential management.
384    #[cfg(feature = "secrets")]
385    pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
386        self.secrets_manager = Some(manager);
387        info!("Secrets manager attached to server");
388    }
389
390    /// Serve MCP over stdio (stdin/stdout) instead of HTTP.
391    ///
392    /// This is used when `FRAISEQL_MCP_STDIO=1` is set.  The server reads JSON-RPC
393    /// messages from stdin and writes responses to stdout, following the MCP stdio
394    /// transport specification.
395    ///
396    /// # Errors
397    ///
398    /// Returns an error if MCP is not configured or the stdio transport fails.
399    #[cfg(feature = "mcp")]
400    pub async fn serve_mcp_stdio(self) -> Result<()> {
401        use rmcp::ServiceExt;
402
403        let mcp_cfg = self.mcp_config.ok_or_else(|| {
404            ServerError::ConfigError(
405                "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
406                 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
407                    .into(),
408            )
409        })?;
410
411        let schema = Arc::new(self.executor.schema().clone());
412        let executor = self.executor.clone();
413
414        let service = crate::mcp::handler::FraiseQLMcpService::new(schema, executor, mcp_cfg);
415
416        info!("MCP stdio transport starting — reading from stdin, writing to stdout");
417
418        let running = service
419            .serve((tokio::io::stdin(), tokio::io::stdout()))
420            .await
421            .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
422
423        running
424            .waiting()
425            .await
426            .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
427
428        Ok(())
429    }
430}