Skip to main content

fraiseql_server/server/
builder.rs

1//! Server constructors and builder methods.
2
3use std::sync::Arc;
4
5#[cfg(feature = "arrow")]
6use fraiseql_arrow::FraiseQLFlightService;
7use fraiseql_core::{
8    cache::{CacheConfig, CachedDatabaseAdapter, QueryResultCache},
9    db::traits::DatabaseAdapter,
10    runtime::{Executor, SubscriptionManager},
11    schema::CompiledSchema,
12    security::{AuthConfig, AuthMiddleware, OidcValidator},
13};
14use tracing::{info, warn};
15
16use super::{RateLimiter, Result, Server, ServerConfig, ServerError};
17
18/// Build an HS256 validator from the server config, if configured.
19pub(super) fn build_hs256_auth(config: &ServerConfig) -> Result<Option<Arc<AuthMiddleware>>> {
20    let Some(ref hs) = config.auth_hs256 else {
21        return Ok(None);
22    };
23    let secret = hs
24        .load_secret()
25        .map_err(|e| ServerError::ConfigError(format!("Failed to initialize HS256 auth: {e}")))?;
26    let mut auth_config = AuthConfig::with_hs256(&secret);
27    if let Some(ref iss) = hs.issuer {
28        auth_config = auth_config.with_issuer(iss);
29    }
30    if let Some(ref aud) = hs.audience {
31        auth_config = auth_config.with_audience(aud);
32    }
33    info!(
34        secret_env = %hs.secret_env,
35        issuer = ?hs.issuer,
36        audience = ?hs.audience,
37        "Initializing HS256 authentication (local validation, no network)"
38    );
39    Ok(Some(Arc::new(AuthMiddleware::from_config(auth_config))))
40}
41
42impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<CachedDatabaseAdapter<A>> {
43    /// Create new server.
44    ///
45    /// Relay pagination queries will return a `Validation` error at runtime. Use
46    /// [`Server::with_relay_pagination`] when the adapter implements
47    /// [`RelayDatabaseAdapter`](fraiseql_core::db::traits::RelayDatabaseAdapter)
48    /// and relay support is required.
49    ///
50    /// # Arguments
51    ///
52    /// * `config` - Server configuration
53    /// * `schema` - Compiled GraphQL schema
54    /// * `adapter` - Database adapter
55    /// * `db_pool` — forwarded to the observer runtime; `None` when observers are disabled.
56    ///
57    /// # Errors
58    ///
59    /// Returns error if OIDC validator initialization fails (e.g., unable to
60    /// fetch discovery document or JWKS).
61    ///
62    /// # Panics
63    ///
64    /// Panics if the `adapter` `Arc` has been cloned before calling this constructor
65    /// (refcount > 1). The builder must have exclusive ownership to unwrap the adapter
66    /// for `CachedDatabaseAdapter` construction.
67    ///
68    /// # Example
69    ///
70    /// ```text
71    /// // Requires: running PostgreSQL database and compiled schema file.
72    /// let config = ServerConfig::default();
73    /// let schema = CompiledSchema::from_json(schema_json)?;
74    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
75    ///
76    /// let server = Server::new(config, schema, adapter, None).await?;
77    /// server.serve().await?;
78    /// ```
79    #[allow(clippy::cognitive_complexity)] // Reason: server construction with subsystem initialization (auth, rate-limit, observers, etc.)
80    pub async fn new(
81        config: ServerConfig,
82        schema: CompiledSchema,
83        adapter: Arc<A>,
84        db_pool: Option<sqlx::PgPool>,
85    ) -> Result<Self> {
86        // Validate compiled schema format version before any further setup.
87        // Warns for legacy schemas (no version field); rejects incompatible future versions.
88        if schema.schema_format_version.is_none() {
89            warn!(
90                "Loaded schema has no schema_format_version (pre-v2.1 format). \
91                 Re-compile with the current fraiseql-cli for version compatibility checking."
92            );
93        }
94        schema.validate_format_version().map_err(|msg| {
95            ServerError::ConfigError(format!("Incompatible compiled schema: {msg}"))
96        })?;
97
98        // Read security configs from compiled schema BEFORE schema is moved.
99        #[cfg(feature = "federation")]
100        let circuit_breaker = schema.federation.as_ref().and_then(
101            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
102        );
103        #[cfg(not(feature = "federation"))]
104        let circuit_breaker: Option<()> = None;
105        #[cfg(not(feature = "federation"))]
106        let _ = &schema.federation;
107        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
108        #[cfg(feature = "auth")]
109        let state_encryption = Self::state_encryption_from_schema(&schema)?;
110        #[cfg(not(feature = "auth"))]
111        let state_encryption: Option<
112            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
113        > = None;
114        #[cfg(feature = "auth")]
115        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
116        #[cfg(not(feature = "auth"))]
117        let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
118        #[cfg(feature = "auth")]
119        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
120        #[cfg(not(feature = "auth"))]
121        let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
122        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
123        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
124        if api_key_authenticator.is_some() {
125            info!("API key authentication enabled");
126        }
127        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
128        if revocation_manager.is_some() {
129            info!("Token revocation enabled");
130        }
131        let trusted_docs = Self::trusted_docs_from_schema(&schema);
132
133        // Validate cache + RLS safety at startup.
134        // Cache isolation relies entirely on per-user WHERE clauses in the cache key.
135        // Without RLS, users with the same query and variables share the same cached
136        // response, which can leak data across tenants.
137        if config.cache_enabled && !schema.has_rls_configured() {
138            if schema.is_multi_tenant() {
139                // Multi-tenant + cache + no RLS is a hard safety violation.
140                return Err(ServerError::ConfigError(
141                    "Cache is enabled in a multi-tenant schema but no Row-Level Security \
142                     policies are declared. This would allow cross-tenant cache hits and \
143                     data leakage. In fraiseql.toml, either disable caching with \
144                     [cache] enabled = false, declare [security.rls] policies, or set \
145                     [security] multi_tenant = false to acknowledge single-tenant mode."
146                        .to_string(),
147                ));
148            }
149            // Single-tenant with cache and no RLS: safe, but warn in case of misconfiguration.
150            warn!(
151                "Query-result caching is enabled but no Row-Level Security policies are \
152                 declared in the compiled schema. This is safe for single-tenant deployments. \
153                 For multi-tenant deployments, declare RLS policies and set \
154                 `security.multi_tenant = true` in your schema."
155            );
156        }
157
158        // Build cache from config.
159        let cache_config = CacheConfig::from(config.cache_enabled);
160        let cache = QueryResultCache::new(cache_config);
161
162        // Log cache state before consuming config.
163        if cache_config.enabled {
164            tracing::info!(
165                max_entries   = cache_config.max_entries,
166                ttl_seconds   = cache_config.ttl_seconds,
167                rls_enforcement = ?cache_config.rls_enforcement,
168                "Query result cache: active"
169            );
170        } else {
171            tracing::info!("Query result cache: disabled");
172        }
173
174        // Read subscription config from compiled schema (hooks, limits).
175        let subscriptions_config = schema.subscriptions_config.clone();
176
177        // Unwrap Arc: refcount is 1 here — adapter has not been cloned since being passed in.
178        let inner = Arc::into_inner(adapter)
179            .expect("CachedDatabaseAdapter wrapping requires exclusive Arc ownership at startup");
180        let cached = CachedDatabaseAdapter::new(inner, cache, schema.content_hash())
181            .with_ttl_overrides_from_schema(&schema);
182        let executor = Arc::new(Executor::new(schema.clone(), Arc::new(cached)));
183        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
184
185        let mut server = Self::from_executor(
186            config,
187            executor,
188            subscription_manager,
189            circuit_breaker,
190            error_sanitizer,
191            state_encryption,
192            pkce_store,
193            oidc_server_client,
194            schema_rate_limiter,
195            api_key_authenticator,
196            revocation_manager,
197            trusted_docs,
198            db_pool,
199        )
200        .await?;
201
202        server.adapter_cache_enabled = cache_config.enabled;
203
204        // Apply pool tuning config from ServerConfig (if present).
205        if let Some(pt) = server.config.pool_tuning.clone() {
206            if pt.enabled {
207                server = server
208                    .with_pool_tuning(pt)
209                    .map_err(|e| ServerError::ConfigError(format!("pool_tuning: {e}")))?;
210            }
211        }
212
213        // Initialize MCP config from compiled schema when the feature is compiled in.
214        #[cfg(feature = "mcp")]
215        if let Some(ref cfg) = server.executor.schema().mcp_config {
216            if cfg.enabled {
217                let tool_count =
218                    crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
219                info!(
220                    path = %cfg.path,
221                    transport = %cfg.transport,
222                    tools = tool_count,
223                    "MCP server configured"
224                );
225                server.mcp_config = Some(cfg.clone());
226            }
227        }
228
229        // Initialize APQ store when enabled.
230        if server.config.apq_enabled {
231            let apq_store: fraiseql_core::apq::ArcApqStorage =
232                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
233            server.apq_store = Some(apq_store);
234            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
235        }
236
237        // Apply subscription lifecycle/limits from compiled schema.
238        if let Some(ref subs) = subscriptions_config {
239            if let Some(max) = subs.max_subscriptions_per_connection {
240                server.max_subscriptions_per_connection = Some(max);
241            }
242            if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_config(subs) {
243                server.subscription_lifecycle = Arc::new(lifecycle);
244            }
245        }
246
247        Ok(server)
248    }
249}
250
251impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
252    /// Shared initialization path used by both `new` and `with_relay_pagination`.
253    ///
254    /// Accepts a pre-built executor so that relay vs. non-relay constructors can supply
255    /// the appropriate variant without duplicating auth/rate-limiter/observer setup.
256    #[allow(clippy::too_many_arguments)]
257    // Reason: internal constructor collects all pre-built subsystems; a builder struct would not
258    // reduce call-site clarity
259    #[allow(clippy::cognitive_complexity)] // Reason: internal constructor that assembles server from pre-built subsystems
260    pub(super) async fn from_executor(
261        config: ServerConfig,
262        executor: Arc<Executor<A>>,
263        subscription_manager: Arc<SubscriptionManager>,
264        #[cfg(feature = "federation")] circuit_breaker: Option<
265            Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
266        >,
267        #[cfg(not(feature = "federation"))] _circuit_breaker: Option<()>,
268        error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
269        state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
270        pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
271        oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
272        schema_rate_limiter: Option<Arc<RateLimiter>>,
273        api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
274        revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
275        trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
276        // `db_pool` is forwarded to the observer runtime; unused when the `observers` feature is
277        // off.
278        #[cfg_attr(not(feature = "observers"), allow(unused_variables))] db_pool: Option<
279            sqlx::PgPool,
280        >,
281    ) -> Result<Self> {
282        // Initialize OIDC validator if auth is configured
283        let oidc_validator = if let Some(ref auth_config) = config.auth {
284            info!(
285                issuer = %auth_config.issuer,
286                "Initializing OIDC authentication"
287            );
288            let validator = OidcValidator::new(auth_config.clone())
289                .await
290                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
291            Some(Arc::new(validator))
292        } else {
293            None
294        };
295
296        // Initialize HS256 validator if configured (mutually exclusive with OIDC).
297        let hs256_auth = build_hs256_auth(&config)?;
298
299        // Initialize rate limiter: compiled schema config takes priority over server config.
300        let rate_limiter = if let Some(rl) = schema_rate_limiter {
301            Some(rl)
302        } else if let Some(ref rate_config) = config.rate_limiting {
303            if rate_config.enabled {
304                info!(
305                    rps_per_ip = rate_config.rps_per_ip,
306                    rps_per_user = rate_config.rps_per_user,
307                    "Initializing rate limiting from server config"
308                );
309                Some(Arc::new(RateLimiter::new(rate_config.clone())))
310            } else {
311                info!("Rate limiting disabled by configuration");
312                None
313            }
314        } else {
315            None
316        };
317
318        // Initialize observer runtime
319        #[cfg(feature = "observers")]
320        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
321
322        // Initialize Flight service with OIDC authentication if configured
323        #[cfg(feature = "arrow")]
324        let flight_service = {
325            let mut service = FraiseQLFlightService::new();
326            if let Some(ref validator) = oidc_validator {
327                info!("Enabling OIDC authentication for Arrow Flight");
328                service.set_oidc_validator(validator.clone());
329            } else {
330                info!("Arrow Flight initialized without authentication (dev mode)");
331            }
332            Some(service)
333        };
334
335        // Warn if PKCE is configured but [auth] is missing (no OidcServerClient).
336        #[cfg(feature = "auth")]
337        if pkce_store.is_some() && oidc_server_client.is_none() {
338            tracing::error!(
339                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
340                 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
341                 Add [auth] with discovery_url, client_id, client_secret_env, and \
342                 server_redirect_uri to fraiseql.toml and recompile the schema."
343            );
344        }
345
346        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
347        #[cfg(feature = "auth")]
348        Self::check_redis_requirement(pkce_store.as_ref())?;
349
350        // Spawn background PKCE state cleanup task (every 5 minutes).
351        #[cfg(feature = "auth")]
352        if let Some(ref store) = pkce_store {
353            use std::time::Duration;
354
355            use tokio::time::MissedTickBehavior;
356            let store_clone = Arc::clone(store);
357            tokio::spawn(async move {
358                let mut ticker = tokio::time::interval(Duration::from_secs(300));
359                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
360                loop {
361                    ticker.tick().await;
362                    store_clone.cleanup_expired().await;
363                }
364            });
365        }
366
367        // Reason: state_encryption/pkce_store/oidc_server_client are only stored when
368        //         feature = "auth" is enabled; without it they are legitimately unused.
369        #[cfg(not(feature = "auth"))]
370        let _ = (state_encryption, pkce_store, oidc_server_client);
371        Ok(Self {
372            config,
373            executor,
374            subscription_manager,
375            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
376            max_subscriptions_per_connection: None,
377            oidc_validator,
378            hs256_auth,
379            rate_limiter,
380            #[cfg(feature = "secrets")]
381            secrets_manager: None,
382            #[cfg(feature = "federation")]
383            circuit_breaker,
384            error_sanitizer,
385            #[cfg(feature = "auth")]
386            state_encryption,
387            #[cfg(feature = "auth")]
388            pkce_store,
389            #[cfg(feature = "auth")]
390            oidc_server_client,
391            api_key_authenticator,
392            revocation_manager,
393            apq_store: None,
394            trusted_docs,
395            #[cfg(feature = "observers")]
396            observer_runtime,
397            #[cfg(feature = "observers")]
398            db_pool,
399            #[cfg(feature = "arrow")]
400            flight_service,
401            #[cfg(feature = "mcp")]
402            mcp_config: None,
403            pool_tuning_config: None,
404            adapter_cache_enabled: false,
405        })
406    }
407
408    /// Set lifecycle hooks for `WebSocket` subscriptions.
409    #[must_use]
410    pub fn with_subscription_lifecycle(
411        mut self,
412        lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
413    ) -> Self {
414        self.subscription_lifecycle = lifecycle;
415        self
416    }
417
418    /// Set maximum subscriptions allowed per `WebSocket` connection.
419    #[must_use]
420    pub const fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
421        self.max_subscriptions_per_connection = Some(max);
422        self
423    }
424
425    /// Enable adaptive connection pool sizing.
426    ///
427    /// When `config.enabled` is `true`, the server will spawn a background
428    /// polling task that samples pool metrics and recommends or applies resizes.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error string if the configuration fails validation (e.g. `min >= max`).
433    pub fn with_pool_tuning(
434        mut self,
435        config: crate::config::pool_tuning::PoolPressureMonitorConfig,
436    ) -> std::result::Result<Self, String> {
437        config.validate()?;
438        self.pool_tuning_config = Some(config);
439        Ok(self)
440    }
441
442    /// Set secrets manager for the server.
443    ///
444    /// This allows attaching a secrets manager after server creation for credential management.
445    #[cfg(feature = "secrets")]
446    pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
447        self.secrets_manager = Some(manager);
448        info!("Secrets manager attached to server");
449    }
450
451    /// Serve MCP over stdio (stdin/stdout) instead of HTTP.
452    ///
453    /// This is used when `FRAISEQL_MCP_STDIO=1` is set.  The server reads JSON-RPC
454    /// messages from stdin and writes responses to stdout, following the MCP stdio
455    /// transport specification.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if MCP is not configured or the stdio transport fails.
460    #[cfg(feature = "mcp")]
461    pub async fn serve_mcp_stdio(self) -> Result<()> {
462        use rmcp::ServiceExt;
463
464        let mcp_cfg = self.mcp_config.ok_or_else(|| {
465            ServerError::ConfigError(
466                "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
467                 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
468                    .into(),
469            )
470        })?;
471
472        let schema = Arc::new(self.executor.schema().clone());
473        let executor = self.executor.clone();
474
475        let service = crate::mcp::handler::FraiseQLMcpService::new(schema, executor, mcp_cfg);
476
477        info!("MCP stdio transport starting — reading from stdin, writing to stdout");
478
479        let running = service
480            .serve((tokio::io::stdin(), tokio::io::stdout()))
481            .await
482            .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
483
484        running
485            .waiting()
486            .await
487            .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
488
489        Ok(())
490    }
491}