Skip to main content

fraiseql_server/
server.rs

1//! HTTP server implementation.
2
3use std::sync::Arc;
4
5use axum::{
6    Router, extract::DefaultBodyLimit, middleware,
7    routing::{get, post},
8};
9#[cfg(feature = "arrow")]
10use fraiseql_arrow::FraiseQLFlightService;
11use fraiseql_core::{
12    db::traits::{DatabaseAdapter, RelayDatabaseAdapter},
13    runtime::{Executor, SubscriptionManager},
14    schema::CompiledSchema,
15    security::OidcValidator,
16};
17use tokio::net::TcpListener;
18#[cfg(any(feature = "observers", feature = "redis-rate-limiting", feature = "redis-pkce"))]
19use tracing::error;
20use tracing::{info, warn};
21#[cfg(feature = "observers")]
22use {
23    crate::observers::{ObserverRuntime, ObserverRuntimeConfig},
24    tokio::sync::RwLock,
25};
26
27use crate::{
28    Result, ServerError,
29    middleware::{
30        BearerAuthState, OidcAuthState, RateLimiter, bearer_auth_middleware, cors_layer_restricted,
31        metrics_middleware, oidc_auth_middleware, require_json_content_type, trace_layer,
32    },
33    routes::{
34        AuthPkceState, PlaygroundState, SubscriptionState, api, auth_callback, auth_start,
35        graphql::AppState, graphql_get_handler, graphql_handler, health_handler,
36        introspection_handler, metrics_handler, metrics_json_handler, playground_handler,
37        subscription_handler,
38    },
39    server_config::ServerConfig,
40    tls::TlsSetup,
41};
42
43/// FraiseQL HTTP Server.
44pub struct Server<A: DatabaseAdapter> {
45    config:               ServerConfig,
46    executor:             Arc<Executor<A>>,
47    subscription_manager: Arc<SubscriptionManager>,
48    subscription_lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
49    max_subscriptions_per_connection: Option<u32>,
50    oidc_validator:       Option<Arc<OidcValidator>>,
51    rate_limiter:         Option<Arc<RateLimiter>>,
52    secrets_manager:      Option<Arc<crate::secrets_manager::SecretsManager>>,
53    circuit_breaker:
54        Option<Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>>,
55    error_sanitizer:      Arc<crate::config::error_sanitization::ErrorSanitizer>,
56    state_encryption:     Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
57    pkce_store:           Option<Arc<crate::auth::PkceStateStore>>,
58    oidc_server_client:   Option<Arc<crate::auth::OidcServerClient>>,
59    api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
60    revocation_manager:   Option<Arc<crate::token_revocation::TokenRevocationManager>>,
61    apq_store:            Option<Arc<dyn fraiseql_core::apq::ApqStorage>>,
62    trusted_docs:         Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
63
64    #[cfg(feature = "observers")]
65    observer_runtime: Option<Arc<RwLock<ObserverRuntime>>>,
66
67    #[cfg(feature = "observers")]
68    db_pool: Option<sqlx::PgPool>,
69
70    #[cfg(feature = "arrow")]
71    flight_service: Option<FraiseQLFlightService>,
72
73    #[cfg(feature = "mcp")]
74    mcp_config: Option<crate::mcp::McpConfig>,
75}
76
77impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
78    /// Build a `StateEncryptionService` from `security.state_encryption` in the compiled
79    /// schema, if the section is present and `enabled = true`.
80    ///
81    /// # Errors
82    ///
83    /// Returns `ServerError::ConfigError` when `enabled = true` but the key environment
84    /// variable is absent or invalid.  The server must not start in this state.
85    fn state_encryption_from_schema(
86        schema: &CompiledSchema,
87    ) -> crate::Result<Option<Arc<crate::auth::state_encryption::StateEncryptionService>>> {
88        match schema.security.as_ref() {
89            None => Ok(None),
90            Some(s) => {
91                crate::auth::state_encryption::StateEncryptionService::from_compiled_schema(s)
92                    .map_err(|e| ServerError::ConfigError(e.to_string()))
93            },
94        }
95    }
96
97    /// Build a `PkceStateStore` from the compiled schema if `security.pkce.enabled = true`.
98    ///
99    /// When `redis_url` is set and the `redis-pkce` feature is compiled in, initialises
100    /// a Redis-backed distributed store; otherwise falls back to the in-memory backend
101    /// with a warning.
102    async fn pkce_store_from_schema(
103        schema: &CompiledSchema,
104        state_encryption: Option<&Arc<crate::auth::state_encryption::StateEncryptionService>>,
105    ) -> Option<Arc<crate::auth::PkceStateStore>> {
106        let security = schema.security.as_ref()?;
107        let pkce_cfg = security.get("pkce")?;
108
109        #[derive(serde::Deserialize)]
110        struct PkceCfgMinimal {
111            #[serde(default)]
112            enabled:        bool,
113            #[serde(default = "default_ttl")]
114            state_ttl_secs: u64,
115            #[serde(default = "default_method")]
116            code_challenge_method: String,
117            redis_url: Option<String>,
118        }
119        fn default_ttl()    -> u64    { 600 }
120        fn default_method() -> String { "S256".into() }
121
122        let cfg: PkceCfgMinimal = serde_json::from_value(pkce_cfg.clone()).ok()?;
123        if !cfg.enabled {
124            return None;
125        }
126
127        if state_encryption.is_none() {
128            warn!(
129                "pkce.enabled = true but state_encryption is disabled. \
130                 PKCE state tokens are sent to the OIDC provider unencrypted. \
131                 Enable [security.state_encryption] in production for full protection."
132            );
133        }
134
135        if cfg.code_challenge_method.eq_ignore_ascii_case("plain") {
136            warn!(
137                "pkce.code_challenge_method = \"plain\" is insecure. \
138                 Use \"S256\" in all production environments."
139            );
140        }
141
142        let enc = state_encryption.cloned();
143
144        // Prefer the Redis backend when redis_url is configured and the feature is compiled in.
145        #[cfg(feature = "redis-pkce")]
146        if let Some(ref url) = cfg.redis_url {
147            match crate::auth::PkceStateStore::new_redis(url, cfg.state_ttl_secs, enc.clone())
148                .await
149            {
150                Ok(store) => {
151                    info!(redis_url = %url, "PKCE state store: Redis backend");
152                    return Some(Arc::new(store));
153                }
154                Err(e) => {
155                    error!(
156                        error = %e,
157                        redis_url = %url,
158                        "Failed to connect to Redis PKCE store — falling back to in-memory"
159                    );
160                }
161            }
162        }
163
164        #[cfg(not(feature = "redis-pkce"))]
165        if cfg.redis_url.is_some() {
166            warn!(
167                "pkce.redis_url is set but the `redis-pkce` Cargo feature is not compiled in. \
168                 Rebuild with `--features redis-pkce` to enable the Redis PKCE backend. \
169                 Falling back to in-memory storage."
170            );
171        }
172
173        warn!(
174            "PKCE state store: in-memory. In a multi-replica deployment, auth flows will fail \
175             if /auth/start and /auth/callback hit different replicas. \
176             Set [security.pkce] redis_url to enable the Redis backend, \
177             or FRAISEQL_REQUIRE_REDIS=1 to enforce it at startup."
178        );
179
180        Some(Arc::new(crate::auth::PkceStateStore::new(cfg.state_ttl_secs, enc)))
181    }
182
183    /// Validate that distributed storage is configured when `FRAISEQL_REQUIRE_REDIS` is set.
184    ///
185    /// When `FRAISEQL_REQUIRE_REDIS=1` is present in the environment, the server refuses
186    /// to start if the PKCE state store is using in-memory storage.  This prevents silent
187    /// per-replica state isolation in multi-instance deployments.
188    ///
189    /// # Errors
190    ///
191    /// Returns `ServerError::ConfigError` with an operator-actionable message when the
192    /// constraint is violated.
193    fn check_redis_requirement(
194        pkce_store: Option<&Arc<crate::auth::PkceStateStore>>,
195    ) -> crate::Result<()> {
196        if std::env::var("FRAISEQL_REQUIRE_REDIS").is_ok() {
197            let pkce_in_memory = pkce_store.is_some_and(|s| s.is_in_memory());
198            if pkce_in_memory {
199                return Err(ServerError::ConfigError(concat!(
200                    "FraiseQL failed to start\n\n",
201                    "  FRAISEQL_REQUIRE_REDIS is set but PKCE auth state is using in-memory storage.\n",
202                    "  In a multi-replica deployment, auth callbacks can fail if they hit a\n",
203                    "  different replica than the one that handled /auth/start.\n\n",
204                    "  To fix:\n",
205                    "    [security.pkce]\n",
206                    "    redis_url = \"redis://localhost:6379\"\n\n",
207                    "    [security.rate_limiting]\n",
208                    "    redis_url = \"redis://localhost:6379\"\n\n",
209                    "  To allow in-memory (single-replica only):\n",
210                    "    Unset FRAISEQL_REQUIRE_REDIS",
211                )
212                .into()));
213            }
214        }
215        Ok(())
216    }
217
218    /// Build an `OidcServerClient` from the compiled schema JSON, if `[auth]` is present.
219    fn oidc_server_client_from_schema(
220        schema: &CompiledSchema,
221    ) -> Option<Arc<crate::auth::OidcServerClient>> {
222        // The full schema JSON lives in the executor's compiled schema.
223        // Access it via the security Value (which contains the embedded JSON blob).
224        // We expose the root schema JSON here.
225        let schema_json = serde_json::to_value(schema).ok()?;
226        crate::auth::OidcServerClient::from_compiled_schema(&schema_json)
227    }
228
229    /// Build a `RateLimiter` from the `security.rate_limiting` key embedded in the
230    /// compiled schema, if present and `enabled = true`.
231    ///
232    /// When `redis_url` is set and the `redis-rate-limiting` feature is compiled in,
233    /// initialises a Redis-backed distributed limiter; otherwise falls back to the
234    /// in-memory backend (with a warning when `redis_url` is set but the feature is
235    /// absent).
236    async fn rate_limiter_from_schema(schema: &CompiledSchema) -> Option<Arc<RateLimiter>> {
237        let sec: crate::middleware::RateLimitingSecurityConfig = schema
238            .security
239            .as_ref()
240            .and_then(|s| s.get("rate_limiting"))
241            .and_then(|v| serde_json::from_value(v.clone()).ok())?;
242
243        if !sec.enabled {
244            return None;
245        }
246
247        let config = crate::middleware::RateLimitConfig::from_security_config(&sec);
248
249        let limiter: RateLimiter = if let Some(ref redis_url) = sec.redis_url {
250            #[cfg(feature = "redis-rate-limiting")]
251            {
252                match RateLimiter::new_redis(redis_url, config.clone()).await {
253                    Ok(rl) => {
254                        info!(
255                            url = redis_url.as_str(),
256                            rps_per_ip = config.rps_per_ip,
257                            burst_size = config.burst_size,
258                            "Rate limiting: using Redis distributed backend"
259                        );
260                        rl.with_path_rules_from_security(&sec)
261                    },
262                    Err(e) => {
263                        error!(
264                            error = %e,
265                            "Failed to connect to Redis for rate limiting — \
266                             falling back to in-memory backend"
267                        );
268                        RateLimiter::new(config).with_path_rules_from_security(&sec)
269                    },
270                }
271            }
272            #[cfg(not(feature = "redis-rate-limiting"))]
273            {
274                let _ = redis_url;
275                warn!(
276                    "rate_limiting.redis_url is set but the server was compiled without the \
277                     'redis-rate-limiting' feature. Using in-memory backend."
278                );
279                RateLimiter::new(config).with_path_rules_from_security(&sec)
280            }
281        } else {
282            info!(
283                rps_per_ip = config.rps_per_ip,
284                burst_size = config.burst_size,
285                "Rate limiting: using in-memory backend"
286            );
287            RateLimiter::new(config).with_path_rules_from_security(&sec)
288        };
289
290        Some(Arc::new(limiter))
291    }
292
293    /// Build an `ErrorSanitizer` from the `security.error_sanitization` key in the
294    /// compiled schema's security blob (if present), falling back to a disabled sanitizer.
295    fn error_sanitizer_from_schema(
296        schema: &CompiledSchema,
297    ) -> Arc<crate::config::error_sanitization::ErrorSanitizer> {
298        let sanitizer = schema
299            .security
300            .as_ref()
301            .and_then(|s| s.get("error_sanitization"))
302            .and_then(|v| {
303                serde_json::from_value::<
304                    crate::config::error_sanitization::ErrorSanitizationConfig,
305                >(v.clone())
306                .ok()
307            })
308            .map(crate::config::error_sanitization::ErrorSanitizer::new)
309            .unwrap_or_else(crate::config::error_sanitization::ErrorSanitizer::disabled);
310        Arc::new(sanitizer)
311    }
312
313    /// Build a `TrustedDocumentStore` from `security.trusted_documents` in the
314    /// compiled schema, if present and `enabled = true`.
315    fn trusted_docs_from_schema(
316        schema: &CompiledSchema,
317    ) -> Option<Arc<crate::trusted_documents::TrustedDocumentStore>> {
318        let security = schema.security.as_ref()?;
319        let td_cfg = security.get("trusted_documents")?;
320
321        #[derive(serde::Deserialize)]
322        struct TdCfgMinimal {
323            #[serde(default)]
324            enabled: bool,
325            #[serde(default)]
326            mode: String,
327            manifest_path: Option<String>,
328            #[allow(dead_code)]
329            manifest_url: Option<String>,
330            #[serde(default)]
331            reload_interval_secs: u64,
332        }
333
334        let cfg: TdCfgMinimal = serde_json::from_value(td_cfg.clone()).ok()?;
335        if !cfg.enabled {
336            return None;
337        }
338
339        let mode = if cfg.mode.eq_ignore_ascii_case("strict") {
340            crate::trusted_documents::TrustedDocumentMode::Strict
341        } else {
342            crate::trusted_documents::TrustedDocumentMode::Permissive
343        };
344
345        if let Some(ref path) = cfg.manifest_path {
346            match crate::trusted_documents::TrustedDocumentStore::from_manifest_file(
347                std::path::Path::new(path),
348                mode,
349            ) {
350                Ok(store) => {
351                    let store = Arc::new(store);
352                    // Spawn hot-reload task if configured.
353                    if cfg.reload_interval_secs > 0 {
354                        if let Some(ref url) = cfg.manifest_url {
355                            Self::spawn_trusted_docs_reload(
356                                Arc::clone(&store),
357                                url.clone(),
358                                cfg.reload_interval_secs,
359                            );
360                        } else {
361                            warn!(
362                                "trusted_documents.reload_interval_secs > 0 but no manifest_url set \
363                                 — hot-reload disabled (file-based manifests must be reloaded manually)"
364                            );
365                        }
366                    }
367                    info!(
368                        manifest = %path,
369                        mode = ?mode,
370                        "Trusted documents loaded"
371                    );
372                    Some(store)
373                }
374                Err(e) => {
375                    tracing::error!(error = %e, "Failed to load trusted documents manifest");
376                    None
377                }
378            }
379        } else {
380            warn!("trusted_documents.enabled = true but no manifest_path or manifest_url set");
381            None
382        }
383    }
384
385    /// Spawn a background task that periodically re-fetches the manifest from a URL.
386    fn spawn_trusted_docs_reload(
387        store: Arc<crate::trusted_documents::TrustedDocumentStore>,
388        url: String,
389        interval_secs: u64,
390    ) {
391        tokio::spawn(async move {
392            let mut ticker =
393                tokio::time::interval(std::time::Duration::from_secs(interval_secs));
394            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
395            loop {
396                ticker.tick().await;
397                match reqwest::get(&url).await {
398                    Ok(resp) => match resp.text().await {
399                        Ok(body) => {
400                            #[derive(serde::Deserialize)]
401                            struct Manifest {
402                                documents: std::collections::HashMap<String, String>,
403                            }
404                            match serde_json::from_str::<Manifest>(&body) {
405                                Ok(manifest) => {
406                                    let count = manifest.documents.len();
407                                    store.replace_documents(manifest.documents).await;
408                                    info!(
409                                        count,
410                                        "Trusted documents manifest reloaded"
411                                    );
412                                }
413                                Err(e) => {
414                                    warn!(error = %e, "Failed to parse trusted documents manifest");
415                                }
416                            }
417                        }
418                        Err(e) => {
419                            warn!(error = %e, "Failed to read trusted documents manifest response");
420                        }
421                    },
422                    Err(e) => {
423                        warn!(error = %e, "Failed to fetch trusted documents manifest");
424                    }
425                }
426            }
427        });
428    }
429
430    /// Create new server.
431    ///
432    /// Relay pagination queries will return a `Validation` error at runtime. Use
433    /// [`Server::with_relay_pagination`] when the adapter implements [`RelayDatabaseAdapter`]
434    /// and relay support is required.
435    ///
436    /// # Arguments
437    ///
438    /// * `config` - Server configuration
439    /// * `schema` - Compiled GraphQL schema
440    /// * `adapter` - Database adapter
441    /// * `db_pool` - Database connection pool (optional, required for observers)
442    ///
443    /// # Errors
444    ///
445    /// Returns error if OIDC validator initialization fails (e.g., unable to
446    /// fetch discovery document or JWKS).
447    ///
448    /// # Example
449    ///
450    /// ```rust,ignore
451    /// let config = ServerConfig::default();
452    /// let schema = CompiledSchema::from_json(schema_json)?;
453    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
454    ///
455    /// let server = Server::new(config, schema, adapter, None).await?;
456    /// server.serve().await?;
457    /// ```
458    pub async fn new(
459        config: ServerConfig,
460        schema: CompiledSchema,
461        adapter: Arc<A>,
462        #[allow(unused_variables)] db_pool: Option<sqlx::PgPool>,
463    ) -> Result<Self> {
464        // Read security configs from compiled schema BEFORE schema is moved.
465        let circuit_breaker = schema
466            .federation
467            .as_ref()
468            .and_then(crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_schema_json);
469        let error_sanitizer    = Self::error_sanitizer_from_schema(&schema);
470        let state_encryption   = Self::state_encryption_from_schema(&schema)?;
471        let pkce_store         = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
472        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
473        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
474        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
475        if api_key_authenticator.is_some() {
476            info!("API key authentication enabled");
477        }
478        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
479        if revocation_manager.is_some() {
480            info!("Token revocation enabled");
481        }
482        let trusted_docs = Self::trusted_docs_from_schema(&schema);
483
484        // Warn when query-result caching is active but no RLS policies are declared.
485        // Cache isolation relies on per-user WHERE clauses in the cache key.  Without RLS,
486        // all users share the same (empty) WHERE clause and therefore share cache entries,
487        // which can leak data between tenants in multi-tenant deployments.
488        if config.cache_enabled && !schema.has_rls_configured() {
489            warn!(
490                "Query-result caching is enabled but no Row-Level Security policies are declared \
491                 in the compiled schema. Cache isolation relies on per-user WHERE clauses in cache \
492                 keys. Without RLS, users with the same query and variables will receive the same \
493                 cached response. This is safe for single-tenant deployments but WILL LEAK DATA \
494                 between tenants in multi-tenant deployments. Declare policies in fraiseql.toml \
495                 or set cache_enabled = false if you are using PostgreSQL-native RLS without \
496                 FraiseQL policy injection."
497            );
498        }
499
500        // Read subscription config from compiled schema (hooks, limits).
501        let subscriptions_config_json = schema.subscriptions_config.clone();
502
503        let executor = Arc::new(Executor::new(schema.clone(), adapter));
504        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
505
506        let mut server = Self::from_executor(
507            config,
508            executor,
509            subscription_manager,
510            circuit_breaker,
511            error_sanitizer,
512            state_encryption,
513            pkce_store,
514            oidc_server_client,
515            schema_rate_limiter,
516            api_key_authenticator,
517            revocation_manager,
518            trusted_docs,
519            db_pool,
520        )
521        .await?;
522
523        // Initialize MCP config from compiled schema when the feature is compiled in.
524        #[cfg(feature = "mcp")]
525        {
526            if let Some(ref mcp_json) = server.executor.schema().mcp_config {
527                match serde_json::from_value::<crate::mcp::McpConfig>(mcp_json.clone()) {
528                    Ok(cfg) if cfg.enabled => {
529                        let tool_count = crate::mcp::tools::schema_to_tools(
530                            server.executor.schema(),
531                            &cfg,
532                        ).len();
533                        info!(
534                            path = %cfg.path,
535                            transport = %cfg.transport,
536                            tools = tool_count,
537                            "MCP server configured"
538                        );
539                        server.mcp_config = Some(cfg);
540                    }
541                    Ok(_) => {}
542                    Err(e) => {
543                        warn!(error = %e, "Invalid mcp_config in compiled schema — MCP disabled");
544                    }
545                }
546            }
547        }
548
549        // Initialize APQ store when enabled.
550        if server.config.apq_enabled {
551            let apq_store: Arc<dyn fraiseql_core::apq::ApqStorage> =
552                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
553            server.apq_store = Some(apq_store);
554            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
555        }
556
557        // Apply subscription lifecycle/limits from compiled schema.
558        if let Some(ref subs_json) = subscriptions_config_json {
559            if let Some(max) = subs_json.get("max_subscriptions_per_connection").and_then(|v| v.as_u64()) {
560                #[allow(clippy::cast_possible_truncation)]
561                // Reason: max_subscriptions_per_connection is a u32 config field; u64 → u32
562                // truncation is acceptable for a limit that would never exceed u32::MAX.
563                {
564                    server.max_subscriptions_per_connection = Some(max as u32);
565                }
566            }
567            if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_schema_json(subs_json) {
568                server.subscription_lifecycle = Arc::new(lifecycle);
569            }
570        }
571
572        Ok(server)
573    }
574
575    /// Shared initialization path used by both `new` and `with_relay_pagination`.
576    ///
577    /// Accepts a pre-built executor so that relay vs. non-relay constructors can supply
578    /// the appropriate variant without duplicating auth/rate-limiter/observer setup.
579    #[allow(clippy::too_many_arguments)]
580    // Reason: internal constructor that collects all pre-built subsystems; callers pass
581    // already-constructed values rather than building them here, so grouping into a
582    // builder struct would not reduce call-site clarity.
583    async fn from_executor(
584        config: ServerConfig,
585        executor: Arc<Executor<A>>,
586        subscription_manager: Arc<SubscriptionManager>,
587        circuit_breaker: Option<
588            Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
589        >,
590        error_sanitizer:      Arc<crate::config::error_sanitization::ErrorSanitizer>,
591        state_encryption:     Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
592        pkce_store:           Option<Arc<crate::auth::PkceStateStore>>,
593        oidc_server_client:   Option<Arc<crate::auth::OidcServerClient>>,
594        schema_rate_limiter:  Option<Arc<RateLimiter>>,
595        api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
596        revocation_manager:   Option<Arc<crate::token_revocation::TokenRevocationManager>>,
597        trusted_docs:         Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
598        #[allow(unused_variables)] db_pool: Option<sqlx::PgPool>,
599    ) -> Result<Self> {
600        // Initialize OIDC validator if auth is configured
601        let oidc_validator = if let Some(ref auth_config) = config.auth {
602            info!(
603                issuer = %auth_config.issuer,
604                "Initializing OIDC authentication"
605            );
606            let validator = OidcValidator::new(auth_config.clone())
607                .await
608                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
609            Some(Arc::new(validator))
610        } else {
611            None
612        };
613
614        // Initialize rate limiter: compiled schema config takes priority over server config.
615        let rate_limiter = if let Some(rl) = schema_rate_limiter {
616            Some(rl)
617        } else if let Some(ref rate_config) = config.rate_limiting {
618            if rate_config.enabled {
619                info!(
620                    rps_per_ip = rate_config.rps_per_ip,
621                    rps_per_user = rate_config.rps_per_user,
622                    "Initializing rate limiting from server config"
623                );
624                let limiter_config = crate::middleware::RateLimitConfig {
625                    enabled:               true,
626                    rps_per_ip:            rate_config.rps_per_ip,
627                    rps_per_user:          rate_config.rps_per_user,
628                    burst_size:            rate_config.burst_size,
629                    cleanup_interval_secs: rate_config.cleanup_interval_secs,
630                    trust_proxy_headers:   false,
631                };
632                Some(Arc::new(RateLimiter::new(limiter_config)))
633            } else {
634                info!("Rate limiting disabled by configuration");
635                None
636            }
637        } else {
638            None
639        };
640
641        // Initialize observer runtime
642        #[cfg(feature = "observers")]
643        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
644
645        // Initialize Flight service with OIDC authentication if configured
646        #[cfg(feature = "arrow")]
647        let flight_service = {
648            let mut service = FraiseQLFlightService::new();
649            if let Some(ref validator) = oidc_validator {
650                info!("Enabling OIDC authentication for Arrow Flight");
651                service.set_oidc_validator(validator.clone());
652            } else {
653                info!("Arrow Flight initialized without authentication (dev mode)");
654            }
655            Some(service)
656        };
657
658        // Warn if PKCE is configured but [auth] is missing (no OidcServerClient).
659        if pkce_store.is_some() && oidc_server_client.is_none() {
660            tracing::error!(
661                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
662                 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
663                 Add [auth] with discovery_url, client_id, client_secret_env, and \
664                 server_redirect_uri to fraiseql.toml and recompile the schema."
665            );
666        }
667
668        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
669        Self::check_redis_requirement(pkce_store.as_ref())?;
670
671        // Spawn background PKCE state cleanup task (every 5 minutes).
672        if let Some(ref store) = pkce_store {
673            use std::time::Duration;
674            use tokio::time::MissedTickBehavior;
675            let store_clone = Arc::clone(store);
676            tokio::spawn(async move {
677                let mut ticker = tokio::time::interval(Duration::from_secs(300));
678                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
679                loop {
680                    ticker.tick().await;
681                    store_clone.cleanup_expired().await;
682                }
683            });
684        }
685
686        Ok(Self {
687            config,
688            executor,
689            subscription_manager,
690            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
691            max_subscriptions_per_connection: None,
692            oidc_validator,
693            rate_limiter,
694            secrets_manager: None,
695            circuit_breaker,
696            error_sanitizer,
697            state_encryption,
698            pkce_store,
699            oidc_server_client,
700            api_key_authenticator,
701            revocation_manager,
702            apq_store: None,
703            trusted_docs,
704            #[cfg(feature = "observers")]
705            observer_runtime,
706            #[cfg(feature = "observers")]
707            db_pool,
708            #[cfg(feature = "arrow")]
709            flight_service,
710            #[cfg(feature = "mcp")]
711            mcp_config: None,
712        })
713    }
714
715    /// Set lifecycle hooks for WebSocket subscriptions.
716    #[must_use]
717    pub fn with_subscription_lifecycle(
718        mut self,
719        lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
720    ) -> Self {
721        self.subscription_lifecycle = lifecycle;
722        self
723    }
724
725    /// Set maximum subscriptions allowed per WebSocket connection.
726    #[must_use]
727    pub fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
728        self.max_subscriptions_per_connection = Some(max);
729        self
730    }
731
732    /// Set secrets manager for the server.
733    ///
734    /// This allows attaching a secrets manager after server creation for credential management.
735    pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
736        self.secrets_manager = Some(manager);
737        info!("Secrets manager attached to server");
738    }
739
740    /// Serve MCP over stdio (stdin/stdout) instead of HTTP.
741    ///
742    /// This is used when `FRAISEQL_MCP_STDIO=1` is set.  The server reads JSON-RPC
743    /// messages from stdin and writes responses to stdout, following the MCP stdio
744    /// transport specification.
745    ///
746    /// # Errors
747    ///
748    /// Returns an error if MCP is not configured or the stdio transport fails.
749    #[cfg(feature = "mcp")]
750    pub async fn serve_mcp_stdio(self) -> Result<()> {
751        let mcp_cfg = self.mcp_config.ok_or_else(|| {
752            ServerError::ConfigError(
753                "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
754                 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
755                    .into(),
756            )
757        })?;
758
759        let schema = Arc::new(self.executor.schema().clone());
760        let executor = self.executor.clone();
761
762        let service = crate::mcp::handler::FraiseQLMcpService::new(
763            schema,
764            executor,
765            mcp_cfg,
766        );
767
768        info!("MCP stdio transport starting — reading from stdin, writing to stdout");
769
770        use rmcp::ServiceExt;
771        let running = service
772            .serve((tokio::io::stdin(), tokio::io::stdout()))
773            .await
774            .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
775
776        running.waiting().await
777            .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
778
779        Ok(())
780    }
781}
782
783impl<A: DatabaseAdapter + RelayDatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
784    /// Create a server with relay pagination support enabled.
785    ///
786    /// The adapter must implement [`RelayDatabaseAdapter`]. Currently, only
787    /// `PostgresAdapter` and `CachedDatabaseAdapter<PostgresAdapter>` satisfy this bound.
788    ///
789    /// Relay queries issued against a server created with [`Server::new`] return a
790    /// `Validation` error at runtime; those issued against a server created with this
791    /// constructor succeed.
792    ///
793    /// # Arguments
794    ///
795    /// * `config` - Server configuration
796    /// * `schema` - Compiled GraphQL schema
797    /// * `adapter` - Database adapter (must implement `RelayDatabaseAdapter`)
798    /// * `db_pool` - Database connection pool (optional, required for observers)
799    ///
800    /// # Errors
801    ///
802    /// Returns error if OIDC validator initialization fails.
803    ///
804    /// # Example
805    ///
806    /// ```rust,ignore
807    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
808    /// let server = Server::with_relay_pagination(config, schema, adapter, None).await?;
809    /// server.serve().await?;
810    /// ```
811    pub async fn with_relay_pagination(
812        config: ServerConfig,
813        schema: CompiledSchema,
814        adapter: Arc<A>,
815        db_pool: Option<sqlx::PgPool>,
816    ) -> Result<Self> {
817        // Read security configs from compiled schema BEFORE schema is moved.
818        let circuit_breaker = schema
819            .federation
820            .as_ref()
821            .and_then(crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_schema_json);
822        let error_sanitizer    = Self::error_sanitizer_from_schema(&schema);
823        let state_encryption   = Self::state_encryption_from_schema(&schema)?;
824        let pkce_store         = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
825        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
826        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
827        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
828        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
829        let trusted_docs = Self::trusted_docs_from_schema(&schema);
830
831        let executor = Arc::new(Executor::new_with_relay(schema.clone(), adapter));
832        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
833
834        let mut server = Self::from_executor(
835            config,
836            executor,
837            subscription_manager,
838            circuit_breaker,
839            error_sanitizer,
840            state_encryption,
841            pkce_store,
842            oidc_server_client,
843            schema_rate_limiter,
844            api_key_authenticator,
845            revocation_manager,
846            trusted_docs,
847            db_pool,
848        )
849        .await?;
850
851        // Initialize MCP config from compiled schema when the feature is compiled in.
852        #[cfg(feature = "mcp")]
853        {
854            if let Some(ref mcp_json) = server.executor.schema().mcp_config {
855                match serde_json::from_value::<crate::mcp::McpConfig>(mcp_json.clone()) {
856                    Ok(cfg) if cfg.enabled => {
857                        let tool_count = crate::mcp::tools::schema_to_tools(
858                            server.executor.schema(),
859                            &cfg,
860                        ).len();
861                        info!(
862                            path = %cfg.path,
863                            transport = %cfg.transport,
864                            tools = tool_count,
865                            "MCP server configured"
866                        );
867                        server.mcp_config = Some(cfg);
868                    }
869                    Ok(_) => {}
870                    Err(e) => {
871                        warn!(error = %e, "Invalid mcp_config in compiled schema — MCP disabled");
872                    }
873                }
874            }
875        }
876
877        // Initialize APQ store when enabled.
878        if server.config.apq_enabled {
879            let apq_store: Arc<dyn fraiseql_core::apq::ApqStorage> =
880                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
881            server.apq_store = Some(apq_store);
882            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
883        }
884
885        Ok(server)
886    }
887}
888
889impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
890    /// Create new server with pre-configured Arrow Flight service.
891    ///
892    /// Use this constructor when you want to provide a Flight service with a real database adapter.
893    ///
894    /// # Arguments
895    ///
896    /// * `config` - Server configuration
897    /// * `schema` - Compiled GraphQL schema
898    /// * `adapter` - Database adapter
899    /// * `db_pool` - Database connection pool (optional, required for observers)
900    /// * `flight_service` - Pre-configured Flight service (only available with arrow feature)
901    ///
902    /// # Errors
903    ///
904    /// Returns error if OIDC validator initialization fails.
905    #[cfg(feature = "arrow")]
906    pub async fn with_flight_service(
907        config: ServerConfig,
908        schema: CompiledSchema,
909        adapter: Arc<A>,
910        #[allow(unused_variables)] db_pool: Option<sqlx::PgPool>,
911        flight_service: Option<FraiseQLFlightService>,
912    ) -> Result<Self> {
913        // Read security configs from compiled schema BEFORE schema is moved.
914        let circuit_breaker = schema
915            .federation
916            .as_ref()
917            .and_then(crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_schema_json);
918        let error_sanitizer     = Self::error_sanitizer_from_schema(&schema);
919        let state_encryption    = Self::state_encryption_from_schema(&schema)?;
920        let pkce_store          = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
921        let oidc_server_client  = Self::oidc_server_client_from_schema(&schema);
922        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
923        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
924        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
925        let trusted_docs = Self::trusted_docs_from_schema(&schema);
926
927        let executor = Arc::new(Executor::new(schema.clone(), adapter));
928        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
929
930        // Initialize OIDC validator if auth is configured
931        let oidc_validator = if let Some(ref auth_config) = config.auth {
932            info!(
933                issuer = %auth_config.issuer,
934                "Initializing OIDC authentication"
935            );
936            let validator = OidcValidator::new(auth_config.clone())
937                .await
938                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
939            Some(Arc::new(validator))
940        } else {
941            None
942        };
943
944        // Initialize rate limiter: compiled schema config takes priority over server config.
945        let rate_limiter = if let Some(rl) = schema_rate_limiter {
946            Some(rl)
947        } else if let Some(ref rate_config) = config.rate_limiting {
948            if rate_config.enabled {
949                info!(
950                    rps_per_ip = rate_config.rps_per_ip,
951                    rps_per_user = rate_config.rps_per_user,
952                    "Initializing rate limiting from server config"
953                );
954                let limiter_config = crate::middleware::RateLimitConfig {
955                    enabled:               true,
956                    rps_per_ip:            rate_config.rps_per_ip,
957                    rps_per_user:          rate_config.rps_per_user,
958                    burst_size:            rate_config.burst_size,
959                    cleanup_interval_secs: rate_config.cleanup_interval_secs,
960                    trust_proxy_headers:   false,
961                };
962                Some(Arc::new(RateLimiter::new(limiter_config)))
963            } else {
964                info!("Rate limiting disabled by configuration");
965                None
966            }
967        } else {
968            None
969        };
970
971        // Initialize observer runtime
972        #[cfg(feature = "observers")]
973        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
974
975        // Warn if PKCE is configured but [auth] is missing.
976        if pkce_store.is_some() && oidc_server_client.is_none() {
977            tracing::error!(
978                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
979                 Auth routes will NOT be mounted."
980            );
981        }
982
983        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
984        Self::check_redis_requirement(pkce_store.as_ref())?;
985
986        // Spawn background PKCE state cleanup task (every 5 minutes).
987        if let Some(ref store) = pkce_store {
988            use std::time::Duration;
989            use tokio::time::MissedTickBehavior;
990            let store_clone = Arc::clone(store);
991            tokio::spawn(async move {
992                let mut ticker = tokio::time::interval(Duration::from_secs(300));
993                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
994                loop {
995                    ticker.tick().await;
996                    store_clone.cleanup_expired().await;
997                }
998            });
999        }
1000
1001        let apq_enabled = config.apq_enabled;
1002
1003        Ok(Self {
1004            config,
1005            executor,
1006            subscription_manager,
1007            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
1008            max_subscriptions_per_connection: None,
1009            oidc_validator,
1010            rate_limiter,
1011            secrets_manager: None,
1012            circuit_breaker,
1013            error_sanitizer,
1014            state_encryption,
1015            pkce_store,
1016            oidc_server_client,
1017            api_key_authenticator,
1018            revocation_manager,
1019            apq_store: if apq_enabled {
1020                Some(Arc::new(fraiseql_core::apq::InMemoryApqStorage::default())
1021                    as Arc<dyn fraiseql_core::apq::ApqStorage>)
1022            } else {
1023                None
1024            },
1025            trusted_docs,
1026            mcp_config: None,
1027            #[cfg(feature = "observers")]
1028            observer_runtime,
1029            #[cfg(feature = "observers")]
1030            db_pool,
1031            flight_service,
1032        })
1033    }
1034
1035    /// Initialize observer runtime from configuration
1036    #[cfg(feature = "observers")]
1037    async fn init_observer_runtime(
1038        config: &ServerConfig,
1039        pool: Option<&sqlx::PgPool>,
1040    ) -> Option<Arc<RwLock<ObserverRuntime>>> {
1041        // Check if enabled
1042        let observer_config = match &config.observers {
1043            Some(cfg) if cfg.enabled => cfg,
1044            _ => {
1045                info!("Observer runtime disabled");
1046                return None;
1047            },
1048        };
1049
1050        let pool = match pool {
1051            Some(p) => p,
1052            None => {
1053                warn!("No database pool provided for observers");
1054                return None;
1055            },
1056        };
1057
1058        info!("Initializing observer runtime");
1059
1060        let runtime_config = ObserverRuntimeConfig::new(pool.clone())
1061            .with_poll_interval(observer_config.poll_interval_ms)
1062            .with_batch_size(observer_config.batch_size)
1063            .with_channel_capacity(observer_config.channel_capacity);
1064
1065        let runtime = ObserverRuntime::new(runtime_config);
1066        Some(Arc::new(RwLock::new(runtime)))
1067    }
1068
1069    /// Build application router.
1070    fn build_router(&self) -> Router {
1071        let mut state = AppState::new(self.executor.clone());
1072
1073        // Attach secrets manager if configured
1074        if let Some(ref secrets_manager) = self.secrets_manager {
1075            state = state.with_secrets_manager(secrets_manager.clone());
1076            info!("SecretsManager attached to AppState");
1077        }
1078
1079        // Attach federation circuit breaker if configured
1080        if let Some(ref cb) = self.circuit_breaker {
1081            state = state.with_circuit_breaker(cb.clone());
1082            info!("Federation circuit breaker attached to AppState");
1083        }
1084
1085        // Attach error sanitizer (always present; disabled by default)
1086        state = state.with_error_sanitizer(self.error_sanitizer.clone());
1087        if self.error_sanitizer.is_enabled() {
1088            info!("Error sanitizer enabled — internal error details will be stripped from responses");
1089        }
1090
1091        // Attach API key authenticator if configured
1092        if let Some(ref api_key_auth) = self.api_key_authenticator {
1093            state = state.with_api_key_authenticator(api_key_auth.clone());
1094            info!("API key authenticator attached to AppState");
1095        }
1096
1097        // Attach state encryption service if configured
1098        match &self.state_encryption {
1099            Some(svc) => {
1100                state = state.with_state_encryption(svc.clone());
1101                info!("State encryption: enabled");
1102            },
1103            None => {
1104                info!("State encryption: disabled (no key configured)");
1105            },
1106        }
1107
1108        // Build RequestValidator from compiled schema validation config
1109        let mut validator = crate::validation::RequestValidator::new();
1110        if let Some(ref vc) = self.executor.schema().validation_config {
1111            if let Some(depth) = vc.get("max_query_depth").and_then(serde_json::Value::as_u64) {
1112                validator = validator.with_max_depth(depth as usize);
1113                info!(max_query_depth = depth, "Custom query depth limit configured");
1114            }
1115            if let Some(complexity) = vc.get("max_query_complexity").and_then(serde_json::Value::as_u64) {
1116                validator = validator.with_max_complexity(complexity as usize);
1117                info!(max_query_complexity = complexity, "Custom query complexity limit configured");
1118            }
1119        }
1120        state = state.with_validator(validator);
1121
1122        // Attach debug config from compiled schema
1123        state.debug_config.clone_from(&self.executor.schema().debug_config);
1124
1125        // Attach APQ store if configured
1126        if let Some(ref store) = self.apq_store {
1127            state = state.with_apq_store(store.clone());
1128        }
1129
1130        // Attach trusted document store if configured
1131        if let Some(ref store) = self.trusted_docs {
1132            state = state.with_trusted_docs(store.clone());
1133        }
1134
1135        let metrics = state.metrics.clone();
1136
1137        // Build GraphQL route (possibly with OIDC auth + Content-Type enforcement)
1138        // Supports both GET and POST per GraphQL over HTTP spec
1139        let graphql_router = if let Some(ref validator) = self.oidc_validator {
1140            info!(
1141                graphql_path = %self.config.graphql_path,
1142                "GraphQL endpoint protected by OIDC authentication (GET and POST)"
1143            );
1144            let auth_state = OidcAuthState::new(validator.clone());
1145            let router = Router::new()
1146                .route(
1147                    &self.config.graphql_path,
1148                    get(graphql_get_handler::<A>).post(graphql_handler::<A>),
1149                )
1150                .route_layer(middleware::from_fn_with_state(auth_state, oidc_auth_middleware));
1151
1152            if self.config.require_json_content_type {
1153                router
1154                    .route_layer(middleware::from_fn(require_json_content_type))
1155                    .with_state(state.clone())
1156            } else {
1157                router.with_state(state.clone())
1158            }
1159        } else {
1160            let router = Router::new()
1161                .route(
1162                    &self.config.graphql_path,
1163                    get(graphql_get_handler::<A>).post(graphql_handler::<A>),
1164                );
1165
1166            if self.config.require_json_content_type {
1167                router
1168                    .route_layer(middleware::from_fn(require_json_content_type))
1169                    .with_state(state.clone())
1170            } else {
1171                router.with_state(state.clone())
1172            }
1173        };
1174
1175        // Build base routes (always available without auth)
1176        let mut app = Router::new()
1177            .route(&self.config.health_path, get(health_handler::<A>))
1178            .with_state(state.clone())
1179            .merge(graphql_router);
1180
1181        // Conditionally add playground route
1182        if self.config.playground_enabled {
1183            let playground_state =
1184                PlaygroundState::new(self.config.graphql_path.clone(), self.config.playground_tool);
1185            info!(
1186                playground_path = %self.config.playground_path,
1187                playground_tool = ?self.config.playground_tool,
1188                "GraphQL playground enabled"
1189            );
1190            let playground_router = Router::new()
1191                .route(&self.config.playground_path, get(playground_handler))
1192                .with_state(playground_state);
1193            app = app.merge(playground_router);
1194        }
1195
1196        // Conditionally add subscription route (WebSocket)
1197        if self.config.subscriptions_enabled {
1198            let subscription_state = SubscriptionState::new(self.subscription_manager.clone())
1199                .with_lifecycle(self.subscription_lifecycle.clone())
1200                .with_max_subscriptions(self.max_subscriptions_per_connection);
1201            info!(
1202                subscription_path = %self.config.subscription_path,
1203                "GraphQL subscriptions enabled (graphql-transport-ws + graphql-ws protocols)"
1204            );
1205            let subscription_router = Router::new()
1206                .route(&self.config.subscription_path, get(subscription_handler))
1207                .with_state(subscription_state);
1208            app = app.merge(subscription_router);
1209        }
1210
1211        // Conditionally add introspection endpoint (with optional auth)
1212        if self.config.introspection_enabled {
1213            if self.config.introspection_require_auth {
1214                if let Some(ref validator) = self.oidc_validator {
1215                    info!(
1216                        introspection_path = %self.config.introspection_path,
1217                        "Introspection endpoint enabled (OIDC auth required)"
1218                    );
1219                    let auth_state = OidcAuthState::new(validator.clone());
1220                    let introspection_router = Router::new()
1221                        .route(&self.config.introspection_path, get(introspection_handler::<A>))
1222                        .route_layer(middleware::from_fn_with_state(
1223                            auth_state.clone(),
1224                            oidc_auth_middleware,
1225                        ))
1226                        .with_state(state.clone());
1227                    app = app.merge(introspection_router);
1228
1229                    // Schema export endpoints follow same auth as introspection
1230                    let schema_router = Router::new()
1231                        .route("/api/v1/schema.graphql", get(api::schema::export_sdl_handler::<A>))
1232                        .route("/api/v1/schema.json", get(api::schema::export_json_handler::<A>))
1233                        .route_layer(middleware::from_fn_with_state(
1234                            auth_state,
1235                            oidc_auth_middleware,
1236                        ))
1237                        .with_state(state.clone());
1238                    app = app.merge(schema_router);
1239                } else {
1240                    warn!(
1241                        "introspection_require_auth is true but no OIDC configured - introspection and schema export disabled"
1242                    );
1243                }
1244            } else {
1245                info!(
1246                    introspection_path = %self.config.introspection_path,
1247                    "Introspection endpoint enabled (no auth required - USE ONLY IN DEVELOPMENT)"
1248                );
1249                let introspection_router = Router::new()
1250                    .route(&self.config.introspection_path, get(introspection_handler::<A>))
1251                    .with_state(state.clone());
1252                app = app.merge(introspection_router);
1253
1254                // Schema export endpoints available without auth when introspection enabled without
1255                // auth
1256                let schema_router = Router::new()
1257                    .route("/api/v1/schema.graphql", get(api::schema::export_sdl_handler::<A>))
1258                    .route("/api/v1/schema.json", get(api::schema::export_json_handler::<A>))
1259                    .with_state(state.clone());
1260                app = app.merge(schema_router);
1261            }
1262        }
1263
1264        // Conditionally add metrics routes (protected by bearer token)
1265        if self.config.metrics_enabled {
1266            if let Some(ref token) = self.config.metrics_token {
1267                info!(
1268                    metrics_path = %self.config.metrics_path,
1269                    metrics_json_path = %self.config.metrics_json_path,
1270                    "Metrics endpoints enabled (bearer token required)"
1271                );
1272
1273                let auth_state = BearerAuthState::new(token.clone());
1274
1275                // Create a separate metrics router with auth middleware applied
1276                // The routes need relative paths since we use merge (not nest)
1277                let metrics_router = Router::new()
1278                    .route(&self.config.metrics_path, get(metrics_handler::<A>))
1279                    .route(&self.config.metrics_json_path, get(metrics_json_handler::<A>))
1280                    .route_layer(middleware::from_fn_with_state(auth_state, bearer_auth_middleware))
1281                    .with_state(state.clone());
1282
1283                app = app.merge(metrics_router);
1284            } else {
1285                warn!(
1286                    "metrics_enabled is true but metrics_token is not set - metrics endpoints disabled"
1287                );
1288            }
1289        }
1290
1291        // Conditionally add admin routes (protected by bearer token)
1292        if self.config.admin_api_enabled {
1293            if let Some(ref token) = self.config.admin_token {
1294                info!("Admin API endpoints enabled (bearer token required)");
1295
1296                let auth_state = BearerAuthState::new(token.clone());
1297
1298                // Create a separate admin router with auth middleware applied
1299                let admin_router = Router::new()
1300                    .route(
1301                        "/api/v1/admin/reload-schema",
1302                        post(api::admin::reload_schema_handler::<A>),
1303                    )
1304                    .route("/api/v1/admin/cache/clear", post(api::admin::cache_clear_handler::<A>))
1305                    .route("/api/v1/admin/cache/stats", get(api::admin::cache_stats_handler::<A>))
1306                    .route("/api/v1/admin/config", get(api::admin::config_handler::<A>))
1307                    .route_layer(middleware::from_fn_with_state(auth_state, bearer_auth_middleware))
1308                    .with_state(state.clone());
1309
1310                app = app.merge(admin_router);
1311            } else {
1312                warn!(
1313                    "admin_api_enabled is true but admin_token is not set - admin endpoints disabled"
1314                );
1315            }
1316        }
1317
1318        // Conditionally add design audit endpoints (with optional auth)
1319        if self.config.design_api_require_auth {
1320            if let Some(ref validator) = self.oidc_validator {
1321                info!("Design audit API endpoints enabled (OIDC auth required)");
1322                let auth_state = OidcAuthState::new(validator.clone());
1323                let design_router = Router::new()
1324                    .route(
1325                        "/design/federation-audit",
1326                        post(api::design::federation_audit_handler::<A>),
1327                    )
1328                    .route("/design/cost-audit", post(api::design::cost_audit_handler::<A>))
1329                    .route("/design/cache-audit", post(api::design::cache_audit_handler::<A>))
1330                    .route("/design/auth-audit", post(api::design::auth_audit_handler::<A>))
1331                    .route(
1332                        "/design/compilation-audit",
1333                        post(api::design::compilation_audit_handler::<A>),
1334                    )
1335                    .route("/design/audit", post(api::design::overall_design_audit_handler::<A>))
1336                    .route_layer(middleware::from_fn_with_state(auth_state, oidc_auth_middleware))
1337                    .with_state(state.clone());
1338                app = app.nest("/api/v1", design_router);
1339            } else {
1340                warn!(
1341                    "design_api_require_auth is true but no OIDC configured - design endpoints unprotected"
1342                );
1343                // Add unprotected design endpoints
1344                let design_router = Router::new()
1345                    .route(
1346                        "/design/federation-audit",
1347                        post(api::design::federation_audit_handler::<A>),
1348                    )
1349                    .route("/design/cost-audit", post(api::design::cost_audit_handler::<A>))
1350                    .route("/design/cache-audit", post(api::design::cache_audit_handler::<A>))
1351                    .route("/design/auth-audit", post(api::design::auth_audit_handler::<A>))
1352                    .route(
1353                        "/design/compilation-audit",
1354                        post(api::design::compilation_audit_handler::<A>),
1355                    )
1356                    .route("/design/audit", post(api::design::overall_design_audit_handler::<A>))
1357                    .with_state(state.clone());
1358                app = app.nest("/api/v1", design_router);
1359            }
1360        } else {
1361            info!("Design audit API endpoints enabled (no auth required)");
1362            let design_router = Router::new()
1363                .route("/design/federation-audit", post(api::design::federation_audit_handler::<A>))
1364                .route("/design/cost-audit", post(api::design::cost_audit_handler::<A>))
1365                .route("/design/cache-audit", post(api::design::cache_audit_handler::<A>))
1366                .route("/design/auth-audit", post(api::design::auth_audit_handler::<A>))
1367                .route(
1368                    "/design/compilation-audit",
1369                    post(api::design::compilation_audit_handler::<A>),
1370                )
1371                .route("/design/audit", post(api::design::overall_design_audit_handler::<A>))
1372                .with_state(state.clone());
1373            app = app.nest("/api/v1", design_router);
1374        }
1375
1376        // PKCE OAuth2 auth routes — mounted only when both pkce and [auth] are configured.
1377        if let (Some(store), Some(client)) = (&self.pkce_store, &self.oidc_server_client) {
1378            let auth_state = Arc::new(AuthPkceState {
1379                pkce_store:              Arc::clone(store),
1380                oidc_client:             Arc::clone(client),
1381                http_client:             Arc::new(reqwest::Client::new()),
1382                post_login_redirect_uri: None,
1383            });
1384            let auth_router = Router::new()
1385                .route("/auth/start",    get(auth_start))
1386                .route("/auth/callback", get(auth_callback))
1387                .with_state(auth_state);
1388            app = app.merge(auth_router);
1389            info!("PKCE auth routes mounted: GET /auth/start, GET /auth/callback");
1390        }
1391
1392        // Token revocation routes — mounted only when revocation is configured.
1393        if let Some(ref rev_mgr) = self.revocation_manager {
1394            let rev_state = Arc::new(crate::routes::RevocationRouteState {
1395                revocation_manager: Arc::clone(rev_mgr),
1396            });
1397            let rev_router = Router::new()
1398                .route("/auth/revoke",     post(crate::routes::revoke_token))
1399                .route("/auth/revoke-all", post(crate::routes::revoke_all_tokens))
1400                .with_state(rev_state);
1401            app = app.merge(rev_router);
1402            info!("Token revocation routes mounted: POST /auth/revoke, POST /auth/revoke-all");
1403        }
1404
1405        // MCP (Model Context Protocol) route — mounted when mcp feature is compiled in
1406        // and mcp_config is present.
1407        #[cfg(feature = "mcp")]
1408        if let Some(ref mcp_cfg) = self.mcp_config {
1409            if mcp_cfg.transport == "http" || mcp_cfg.transport == "both" {
1410                use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1411                use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
1412
1413                let schema = Arc::new(self.executor.schema().clone());
1414                let executor = self.executor.clone();
1415                let cfg = mcp_cfg.clone();
1416                let mcp_service = StreamableHttpService::new(
1417                    move || {
1418                        Ok(crate::mcp::handler::FraiseQLMcpService::new(
1419                            schema.clone(),
1420                            executor.clone(),
1421                            cfg.clone(),
1422                        ))
1423                    },
1424                    Arc::new(LocalSessionManager::default()),
1425                    StreamableHttpServerConfig::default(),
1426                );
1427                app = app.nest_service(&mcp_cfg.path, mcp_service);
1428                info!(path = %mcp_cfg.path, "MCP HTTP endpoint mounted");
1429            }
1430        }
1431
1432        // Remaining API routes (query intelligence, federation)
1433        let api_router = api::routes(state.clone());
1434        app = app.nest("/api/v1", api_router);
1435
1436        // RBAC Management API (if database pool available)
1437        #[cfg(feature = "observers")]
1438        if let Some(ref db_pool) = self.db_pool {
1439            info!("Adding RBAC Management API endpoints");
1440            let rbac_backend = Arc::new(
1441                crate::api::rbac_management::db_backend::RbacDbBackend::new(db_pool.clone()),
1442            );
1443            let rbac_state = crate::api::RbacManagementState { db: rbac_backend };
1444            let rbac_router = crate::api::rbac_management_router(rbac_state);
1445            app = app.merge(rbac_router);
1446        }
1447
1448        // Add HTTP metrics middleware (tracks requests and response status codes)
1449        // This runs on ALL routes, even when metrics endpoints are disabled
1450        app = app.layer(middleware::from_fn_with_state(metrics, metrics_middleware));
1451
1452        // Observer routes (if enabled and compiled with feature)
1453        #[cfg(feature = "observers")]
1454        {
1455            app = self.add_observer_routes(app);
1456        }
1457
1458        // Add middleware
1459        if self.config.tracing_enabled {
1460            app = app.layer(trace_layer());
1461        }
1462
1463        if self.config.cors_enabled {
1464            // Use restricted CORS with configured origins
1465            let origins = if self.config.cors_origins.is_empty() {
1466                // Default to localhost for development if no origins configured
1467                tracing::warn!(
1468                    "CORS enabled but no origins configured. Using localhost:3000 as default. \
1469                     Set cors_origins in config for production."
1470                );
1471                vec!["http://localhost:3000".to_string()]
1472            } else {
1473                self.config.cors_origins.clone()
1474            };
1475            app = app.layer(cors_layer_restricted(origins));
1476        }
1477
1478        // Add request body size limit (default 1 MB — prevents memory exhaustion)
1479        if self.config.max_request_body_bytes > 0 {
1480            info!(
1481                max_bytes = self.config.max_request_body_bytes,
1482                "Request body size limit enabled"
1483            );
1484            app = app.layer(DefaultBodyLimit::max(self.config.max_request_body_bytes));
1485        }
1486
1487        // Add rate limiting middleware if configured
1488        if let Some(ref limiter) = self.rate_limiter {
1489            use std::net::SocketAddr;
1490
1491            use axum::extract::ConnectInfo;
1492
1493            info!("Enabling rate limiting middleware");
1494            let limiter_clone = limiter.clone();
1495            app = app.layer(middleware::from_fn(move |ConnectInfo(addr): ConnectInfo<SocketAddr>, req, next: axum::middleware::Next| {
1496                let limiter = limiter_clone.clone();
1497                async move {
1498                    let ip = addr.ip().to_string();
1499
1500                    // Check rate limit
1501                    let check = limiter.check_ip_limit(&ip).await;
1502                    if !check.allowed {
1503                        warn!(ip = %ip, "IP rate limit exceeded");
1504                        use axum::http::StatusCode;
1505                        use axum::response::IntoResponse;
1506                        let retry = check.retry_after_secs;
1507                        let retry_str = retry.to_string();
1508                        let body = format!(
1509                            r#"{{"errors":[{{"message":"Rate limit exceeded. Please retry after {retry} second{s}."}}]}}"#,
1510                            s = if retry == 1 { "" } else { "s" }
1511                        );
1512                        return (
1513                            StatusCode::TOO_MANY_REQUESTS,
1514                            [("Content-Type", "application/json"), ("Retry-After", retry_str.as_str())],
1515                            body,
1516                        ).into_response();
1517                    }
1518
1519                    // Get remaining tokens for headers
1520                    let remaining = check.remaining;
1521                    let mut response = next.run(req).await;
1522
1523                    // Add rate limit headers
1524                    let headers = response.headers_mut();
1525                    if let Ok(limit_value) = format!("{}", limiter.config().rps_per_ip).parse() {
1526                        headers.insert("X-RateLimit-Limit", limit_value);
1527                    }
1528                    if let Ok(remaining_value) = format!("{}", remaining as u32).parse() {
1529                        headers.insert("X-RateLimit-Remaining", remaining_value);
1530                    }
1531
1532                    response
1533                }
1534            }));
1535        }
1536
1537        app
1538    }
1539
1540    /// Add observer-related routes to the router
1541    #[cfg(feature = "observers")]
1542    fn add_observer_routes(&self, app: Router) -> Router {
1543        use crate::observers::{
1544            ObserverRepository, ObserverState, RuntimeHealthState, observer_routes,
1545            observer_runtime_routes,
1546        };
1547
1548        // Management API (always available with feature)
1549        let observer_state = ObserverState {
1550            repository: ObserverRepository::new(
1551                self.db_pool.clone().expect("Pool required for observers"),
1552            ),
1553        };
1554
1555        let app = app.nest("/api/observers", observer_routes(observer_state));
1556
1557        // Runtime health API (only if runtime present)
1558        if let Some(ref runtime) = self.observer_runtime {
1559            info!(
1560                path = "/api/observers",
1561                "Observer management and runtime health endpoints enabled"
1562            );
1563
1564            let runtime_state = RuntimeHealthState {
1565                runtime: runtime.clone(),
1566            };
1567
1568            app.merge(observer_runtime_routes(runtime_state))
1569        } else {
1570            app
1571        }
1572    }
1573
1574    /// Start server and listen for requests.
1575    ///
1576    /// # Errors
1577    ///
1578    /// Returns error if server fails to bind or encounters runtime errors.
1579    pub async fn serve(self) -> Result<()> {
1580        self.serve_with_shutdown(Self::shutdown_signal()).await
1581    }
1582
1583    /// Start server with a custom shutdown future.
1584    ///
1585    /// Enables programmatic shutdown (e.g., for `--watch` hot-reload) by accepting any
1586    /// future that resolves when the server should stop.
1587    ///
1588    /// # Errors
1589    ///
1590    /// Returns error if server fails to bind or encounters runtime errors.
1591    pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<()>
1592    where
1593        F: std::future::Future<Output = ()> + Send + 'static,
1594    {
1595        let app = self.build_router();
1596
1597        // Initialize TLS setup
1598        let tls_setup = TlsSetup::new(self.config.tls.clone(), self.config.database_tls.clone())?;
1599
1600        info!(
1601            bind_addr = %self.config.bind_addr,
1602            graphql_path = %self.config.graphql_path,
1603            tls_enabled = tls_setup.is_tls_enabled(),
1604            "Starting FraiseQL server"
1605        );
1606
1607        // Start observer runtime if configured
1608        #[cfg(feature = "observers")]
1609        if let Some(ref runtime) = self.observer_runtime {
1610            info!("Starting observer runtime...");
1611            let mut guard = runtime.write().await;
1612
1613            match guard.start().await {
1614                Ok(()) => info!("Observer runtime started"),
1615                Err(e) => {
1616                    error!("Failed to start observer runtime: {}", e);
1617                    warn!("Server will continue without observers");
1618                },
1619            }
1620            drop(guard);
1621        }
1622
1623        let listener = TcpListener::bind(self.config.bind_addr)
1624            .await
1625            .map_err(|e| ServerError::BindError(e.to_string()))?;
1626
1627        // Log TLS configuration
1628        if tls_setup.is_tls_enabled() {
1629            // Verify TLS setup is valid (will error if certificates are missing/invalid)
1630            let _ = tls_setup.create_rustls_config()?;
1631            info!(
1632                cert_path = ?tls_setup.cert_path(),
1633                key_path = ?tls_setup.key_path(),
1634                mtls_required = tls_setup.is_mtls_required(),
1635                "Server TLS configuration loaded (note: use reverse proxy for server-side TLS termination)"
1636            );
1637        }
1638
1639        // Log database TLS configuration
1640        info!(
1641            postgres_ssl_mode = tls_setup.postgres_ssl_mode(),
1642            redis_ssl = tls_setup.redis_ssl_enabled(),
1643            clickhouse_https = tls_setup.clickhouse_https_enabled(),
1644            elasticsearch_https = tls_setup.elasticsearch_https_enabled(),
1645            "Database connection TLS configuration applied"
1646        );
1647
1648        info!("Server listening on http://{}", self.config.bind_addr);
1649
1650        // Start both HTTP and gRPC servers concurrently if Arrow Flight is enabled
1651        #[cfg(feature = "arrow")]
1652        if let Some(flight_service) = self.flight_service {
1653            // Flight server runs on port 50051
1654            let flight_addr = "0.0.0.0:50051".parse().expect("Valid Flight address");
1655            info!("Arrow Flight server listening on grpc://{}", flight_addr);
1656
1657            // Spawn Flight server in background
1658            let flight_server = tokio::spawn(async move {
1659                tonic::transport::Server::builder()
1660                    .add_service(flight_service.into_server())
1661                    .serve(flight_addr)
1662                    .await
1663            });
1664
1665            // Wrap the user-supplied shutdown future so we can also stop observer runtime
1666            #[cfg(feature = "observers")]
1667            let observer_runtime = self.observer_runtime.clone();
1668
1669            let shutdown_with_cleanup = async move {
1670                shutdown.await;
1671                #[cfg(feature = "observers")]
1672                if let Some(ref runtime) = observer_runtime {
1673                    info!("Shutting down observer runtime");
1674                    let mut guard = runtime.write().await;
1675                    if let Err(e) = guard.stop().await {
1676                        #[cfg(feature = "observers")]
1677                        error!("Error stopping runtime: {}", e);
1678                    } else {
1679                        info!("Runtime stopped cleanly");
1680                    }
1681                }
1682            };
1683
1684            // Run HTTP server with graceful shutdown
1685            axum::serve(listener, app)
1686                .with_graceful_shutdown(shutdown_with_cleanup)
1687                .await
1688                .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
1689
1690            // Abort Flight server after HTTP server exits
1691            flight_server.abort();
1692        }
1693
1694        // HTTP-only server (when arrow feature not enabled)
1695        #[cfg(not(feature = "arrow"))]
1696        {
1697            axum::serve(listener, app)
1698                .with_graceful_shutdown(shutdown)
1699                .await
1700                .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
1701        }
1702
1703        Ok(())
1704    }
1705
1706    /// Listen for shutdown signals (Ctrl+C or SIGTERM)
1707    pub async fn shutdown_signal() {
1708        use tokio::signal;
1709
1710        let ctrl_c = async {
1711            signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
1712        };
1713
1714        #[cfg(unix)]
1715        let terminate = async {
1716            signal::unix::signal(signal::unix::SignalKind::terminate())
1717                .expect("Failed to install SIGTERM handler")
1718                .recv()
1719                .await;
1720        };
1721
1722        #[cfg(not(unix))]
1723        let terminate = std::future::pending::<()>();
1724
1725        tokio::select! {
1726            _ = ctrl_c => info!("Received Ctrl+C"),
1727            _ = terminate => info!("Received SIGTERM"),
1728        }
1729    }
1730}