Skip to main content

fraiseql_server/server/
builder.rs

1//! Server constructors and builder methods.
2
3use std::sync::Arc;
4
5#[cfg(feature = "arrow")]
6use fraiseql_arrow::FraiseQLFlightService;
7use fraiseql_core::{
8    cache::{CacheConfig, CachedDatabaseAdapter, QueryResultCache},
9    db::traits::DatabaseAdapter,
10    runtime::{Executor, SubscriptionManager},
11    schema::CompiledSchema,
12    security::OidcValidator,
13};
14use tracing::{info, warn};
15
16use super::{RateLimiter, Result, Server, ServerConfig, ServerError};
17
18impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<CachedDatabaseAdapter<A>> {
19    /// Create new server.
20    ///
21    /// Relay pagination queries will return a `Validation` error at runtime. Use
22    /// [`Server::with_relay_pagination`] when the adapter implements
23    /// [`RelayDatabaseAdapter`](fraiseql_core::db::traits::RelayDatabaseAdapter)
24    /// and relay support is required.
25    ///
26    /// # Arguments
27    ///
28    /// * `config` - Server configuration
29    /// * `schema` - Compiled GraphQL schema
30    /// * `adapter` - Database adapter
31    /// * `db_pool` — forwarded to the observer runtime; `None` when observers are disabled.
32    ///
33    /// # Errors
34    ///
35    /// Returns error if OIDC validator initialization fails (e.g., unable to
36    /// fetch discovery document or JWKS).
37    ///
38    /// # Panics
39    ///
40    /// Panics if the `adapter` `Arc` has been cloned before calling this constructor
41    /// (refcount > 1). The builder must have exclusive ownership to unwrap the adapter
42    /// for `CachedDatabaseAdapter` construction.
43    ///
44    /// # Example
45    ///
46    /// ```text
47    /// // Requires: running PostgreSQL database and compiled schema file.
48    /// let config = ServerConfig::default();
49    /// let schema = CompiledSchema::from_json(schema_json)?;
50    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
51    ///
52    /// let server = Server::new(config, schema, adapter, None).await?;
53    /// server.serve().await?;
54    /// ```
55    #[allow(clippy::cognitive_complexity)] // Reason: server construction with subsystem initialization (auth, rate-limit, observers, etc.)
56    pub async fn new(
57        config: ServerConfig,
58        schema: CompiledSchema,
59        adapter: Arc<A>,
60        db_pool: Option<sqlx::PgPool>,
61    ) -> Result<Self> {
62        // Validate compiled schema format version before any further setup.
63        // Warns for legacy schemas (no version field); rejects incompatible future versions.
64        if schema.schema_format_version.is_none() {
65            warn!(
66                "Loaded schema has no schema_format_version (pre-v2.1 format). \
67                 Re-compile with the current fraiseql-cli for version compatibility checking."
68            );
69        }
70        schema.validate_format_version().map_err(|msg| {
71            ServerError::ConfigError(format!("Incompatible compiled schema: {msg}"))
72        })?;
73
74        // Read security configs from compiled schema BEFORE schema is moved.
75        #[cfg(feature = "federation")]
76        let circuit_breaker = schema.federation.as_ref().and_then(
77            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
78        );
79        #[cfg(not(feature = "federation"))]
80        let circuit_breaker: Option<()> = None;
81        #[cfg(not(feature = "federation"))]
82        let _ = &schema.federation;
83        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
84        #[cfg(feature = "auth")]
85        let state_encryption = Self::state_encryption_from_schema(&schema)?;
86        #[cfg(not(feature = "auth"))]
87        let state_encryption: Option<
88            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
89        > = None;
90        #[cfg(feature = "auth")]
91        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
92        #[cfg(not(feature = "auth"))]
93        let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
94        #[cfg(feature = "auth")]
95        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
96        #[cfg(not(feature = "auth"))]
97        let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
98        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
99        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
100        if api_key_authenticator.is_some() {
101            info!("API key authentication enabled");
102        }
103        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
104        if revocation_manager.is_some() {
105            info!("Token revocation enabled");
106        }
107        let trusted_docs = Self::trusted_docs_from_schema(&schema);
108
109        // Validate cache + RLS safety at startup.
110        // Cache isolation relies entirely on per-user WHERE clauses in the cache key.
111        // Without RLS, users with the same query and variables share the same cached
112        // response, which can leak data across tenants.
113        if config.cache_enabled && !schema.has_rls_configured() {
114            if schema.is_multi_tenant() {
115                // Multi-tenant + cache + no RLS is a hard safety violation.
116                return Err(ServerError::ConfigError(
117                    "Cache is enabled in a multi-tenant schema but no Row-Level Security \
118                     policies are declared. This would allow cross-tenant cache hits and \
119                     data leakage. In fraiseql.toml, either disable caching with \
120                     [cache] enabled = false, declare [security.rls] policies, or set \
121                     [security] multi_tenant = false to acknowledge single-tenant mode."
122                        .to_string(),
123                ));
124            }
125            // Single-tenant with cache and no RLS: safe, but warn in case of misconfiguration.
126            warn!(
127                "Query-result caching is enabled but no Row-Level Security policies are \
128                 declared in the compiled schema. This is safe for single-tenant deployments. \
129                 For multi-tenant deployments, declare RLS policies and set \
130                 `security.multi_tenant = true` in your schema."
131            );
132        }
133
134        // Build cache from config.
135        let cache_config = CacheConfig::from(config.cache_enabled);
136        let cache = QueryResultCache::new(cache_config);
137
138        // Log cache state before consuming config.
139        if cache_config.enabled {
140            tracing::info!(
141                max_entries   = cache_config.max_entries,
142                ttl_seconds   = cache_config.ttl_seconds,
143                rls_enforcement = ?cache_config.rls_enforcement,
144                "Query result cache: active"
145            );
146        } else {
147            tracing::info!("Query result cache: disabled");
148        }
149
150        // Read subscription config from compiled schema (hooks, limits).
151        let subscriptions_config = schema.subscriptions_config.clone();
152
153        // Unwrap Arc: refcount is 1 here — adapter has not been cloned since being passed in.
154        let inner = Arc::into_inner(adapter)
155            .expect("CachedDatabaseAdapter wrapping requires exclusive Arc ownership at startup");
156        let cached = CachedDatabaseAdapter::new(inner, cache, schema.content_hash())
157            .with_ttl_overrides_from_schema(&schema);
158        let executor = Arc::new(Executor::new(schema.clone(), Arc::new(cached)));
159        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
160
161        let mut server = Self::from_executor(
162            config,
163            executor,
164            subscription_manager,
165            circuit_breaker,
166            error_sanitizer,
167            state_encryption,
168            pkce_store,
169            oidc_server_client,
170            schema_rate_limiter,
171            api_key_authenticator,
172            revocation_manager,
173            trusted_docs,
174            db_pool,
175        )
176        .await?;
177
178        server.adapter_cache_enabled = cache_config.enabled;
179
180        // Apply pool tuning config from ServerConfig (if present).
181        if let Some(pt) = server.config.pool_tuning.clone() {
182            if pt.enabled {
183                server = server
184                    .with_pool_tuning(pt)
185                    .map_err(|e| ServerError::ConfigError(format!("pool_tuning: {e}")))?;
186            }
187        }
188
189        // Initialize MCP config from compiled schema when the feature is compiled in.
190        #[cfg(feature = "mcp")]
191        if let Some(ref cfg) = server.executor.schema().mcp_config {
192            if cfg.enabled {
193                let tool_count =
194                    crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
195                info!(
196                    path = %cfg.path,
197                    transport = %cfg.transport,
198                    tools = tool_count,
199                    "MCP server configured"
200                );
201                server.mcp_config = Some(cfg.clone());
202            }
203        }
204
205        // Initialize APQ store when enabled.
206        if server.config.apq_enabled {
207            let apq_store: fraiseql_core::apq::ArcApqStorage =
208                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
209            server.apq_store = Some(apq_store);
210            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
211        }
212
213        // Apply subscription lifecycle/limits from compiled schema.
214        if let Some(ref subs) = subscriptions_config {
215            if let Some(max) = subs.max_subscriptions_per_connection {
216                server.max_subscriptions_per_connection = Some(max);
217            }
218            if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_config(subs) {
219                server.subscription_lifecycle = Arc::new(lifecycle);
220            }
221        }
222
223        Ok(server)
224    }
225}
226
227impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
228    /// Shared initialization path used by both `new` and `with_relay_pagination`.
229    ///
230    /// Accepts a pre-built executor so that relay vs. non-relay constructors can supply
231    /// the appropriate variant without duplicating auth/rate-limiter/observer setup.
232    #[allow(clippy::too_many_arguments)]
233    // Reason: internal constructor collects all pre-built subsystems; a builder struct would not
234    // reduce call-site clarity
235    #[allow(clippy::cognitive_complexity)] // Reason: internal constructor that assembles server from pre-built subsystems
236    pub(super) async fn from_executor(
237        config: ServerConfig,
238        executor: Arc<Executor<A>>,
239        subscription_manager: Arc<SubscriptionManager>,
240        #[cfg(feature = "federation")] circuit_breaker: Option<
241            Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
242        >,
243        #[cfg(not(feature = "federation"))] _circuit_breaker: Option<()>,
244        error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
245        state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
246        pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
247        oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
248        schema_rate_limiter: Option<Arc<RateLimiter>>,
249        api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
250        revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
251        trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
252        // `db_pool` is forwarded to the observer runtime; unused when the `observers` feature is
253        // off.
254        #[cfg_attr(not(feature = "observers"), allow(unused_variables))] db_pool: Option<
255            sqlx::PgPool,
256        >,
257    ) -> Result<Self> {
258        // Initialize OIDC validator if auth is configured
259        let oidc_validator = if let Some(ref auth_config) = config.auth {
260            info!(
261                issuer = %auth_config.issuer,
262                "Initializing OIDC authentication"
263            );
264            let validator = OidcValidator::new(auth_config.clone())
265                .await
266                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
267            Some(Arc::new(validator))
268        } else {
269            None
270        };
271
272        // Initialize rate limiter: compiled schema config takes priority over server config.
273        let rate_limiter = if let Some(rl) = schema_rate_limiter {
274            Some(rl)
275        } else if let Some(ref rate_config) = config.rate_limiting {
276            if rate_config.enabled {
277                info!(
278                    rps_per_ip = rate_config.rps_per_ip,
279                    rps_per_user = rate_config.rps_per_user,
280                    "Initializing rate limiting from server config"
281                );
282                Some(Arc::new(RateLimiter::new(rate_config.clone())))
283            } else {
284                info!("Rate limiting disabled by configuration");
285                None
286            }
287        } else {
288            None
289        };
290
291        // Initialize observer runtime
292        #[cfg(feature = "observers")]
293        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
294
295        // Initialize Flight service with OIDC authentication if configured
296        #[cfg(feature = "arrow")]
297        let flight_service = {
298            let mut service = FraiseQLFlightService::new();
299            if let Some(ref validator) = oidc_validator {
300                info!("Enabling OIDC authentication for Arrow Flight");
301                service.set_oidc_validator(validator.clone());
302            } else {
303                info!("Arrow Flight initialized without authentication (dev mode)");
304            }
305            Some(service)
306        };
307
308        // Warn if PKCE is configured but [auth] is missing (no OidcServerClient).
309        #[cfg(feature = "auth")]
310        if pkce_store.is_some() && oidc_server_client.is_none() {
311            tracing::error!(
312                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
313                 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
314                 Add [auth] with discovery_url, client_id, client_secret_env, and \
315                 server_redirect_uri to fraiseql.toml and recompile the schema."
316            );
317        }
318
319        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
320        #[cfg(feature = "auth")]
321        Self::check_redis_requirement(pkce_store.as_ref())?;
322
323        // Spawn background PKCE state cleanup task (every 5 minutes).
324        #[cfg(feature = "auth")]
325        if let Some(ref store) = pkce_store {
326            use std::time::Duration;
327
328            use tokio::time::MissedTickBehavior;
329            let store_clone = Arc::clone(store);
330            tokio::spawn(async move {
331                let mut ticker = tokio::time::interval(Duration::from_secs(300));
332                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
333                loop {
334                    ticker.tick().await;
335                    store_clone.cleanup_expired().await;
336                }
337            });
338        }
339
340        // Reason: state_encryption/pkce_store/oidc_server_client are only stored when
341        //         feature = "auth" is enabled; without it they are legitimately unused.
342        #[cfg(not(feature = "auth"))]
343        let _ = (state_encryption, pkce_store, oidc_server_client);
344        Ok(Self {
345            config,
346            executor,
347            subscription_manager,
348            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
349            max_subscriptions_per_connection: None,
350            oidc_validator,
351            rate_limiter,
352            #[cfg(feature = "secrets")]
353            secrets_manager: None,
354            #[cfg(feature = "federation")]
355            circuit_breaker,
356            error_sanitizer,
357            #[cfg(feature = "auth")]
358            state_encryption,
359            #[cfg(feature = "auth")]
360            pkce_store,
361            #[cfg(feature = "auth")]
362            oidc_server_client,
363            api_key_authenticator,
364            revocation_manager,
365            apq_store: None,
366            trusted_docs,
367            #[cfg(feature = "observers")]
368            observer_runtime,
369            #[cfg(feature = "observers")]
370            db_pool,
371            #[cfg(feature = "arrow")]
372            flight_service,
373            #[cfg(feature = "mcp")]
374            mcp_config: None,
375            pool_tuning_config: None,
376            adapter_cache_enabled: false,
377        })
378    }
379
380    /// Set lifecycle hooks for `WebSocket` subscriptions.
381    #[must_use]
382    pub fn with_subscription_lifecycle(
383        mut self,
384        lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
385    ) -> Self {
386        self.subscription_lifecycle = lifecycle;
387        self
388    }
389
390    /// Set maximum subscriptions allowed per `WebSocket` connection.
391    #[must_use]
392    pub const fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
393        self.max_subscriptions_per_connection = Some(max);
394        self
395    }
396
397    /// Enable adaptive connection pool sizing.
398    ///
399    /// When `config.enabled` is `true`, the server will spawn a background
400    /// polling task that samples pool metrics and recommends or applies resizes.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error string if the configuration fails validation (e.g. `min >= max`).
405    pub fn with_pool_tuning(
406        mut self,
407        config: crate::config::pool_tuning::PoolPressureMonitorConfig,
408    ) -> std::result::Result<Self, String> {
409        config.validate()?;
410        self.pool_tuning_config = Some(config);
411        Ok(self)
412    }
413
414    /// Set secrets manager for the server.
415    ///
416    /// This allows attaching a secrets manager after server creation for credential management.
417    #[cfg(feature = "secrets")]
418    pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
419        self.secrets_manager = Some(manager);
420        info!("Secrets manager attached to server");
421    }
422
423    /// Serve MCP over stdio (stdin/stdout) instead of HTTP.
424    ///
425    /// This is used when `FRAISEQL_MCP_STDIO=1` is set.  The server reads JSON-RPC
426    /// messages from stdin and writes responses to stdout, following the MCP stdio
427    /// transport specification.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if MCP is not configured or the stdio transport fails.
432    #[cfg(feature = "mcp")]
433    pub async fn serve_mcp_stdio(self) -> Result<()> {
434        use rmcp::ServiceExt;
435
436        let mcp_cfg = self.mcp_config.ok_or_else(|| {
437            ServerError::ConfigError(
438                "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
439                 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
440                    .into(),
441            )
442        })?;
443
444        let schema = Arc::new(self.executor.schema().clone());
445        let executor = self.executor.clone();
446
447        let service = crate::mcp::handler::FraiseQLMcpService::new(schema, executor, mcp_cfg);
448
449        info!("MCP stdio transport starting — reading from stdin, writing to stdout");
450
451        let running = service
452            .serve((tokio::io::stdin(), tokio::io::stdout()))
453            .await
454            .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
455
456        running
457            .waiting()
458            .await
459            .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
460
461        Ok(())
462    }
463}