1use std::sync::Arc;
4
5use axum::{
6 Router, extract::DefaultBodyLimit, middleware,
7 routing::{get, post},
8};
9#[cfg(feature = "arrow")]
10use fraiseql_arrow::FraiseQLFlightService;
11use fraiseql_core::{
12 db::traits::{DatabaseAdapter, RelayDatabaseAdapter},
13 runtime::{Executor, SubscriptionManager},
14 schema::CompiledSchema,
15 security::OidcValidator,
16};
17use tokio::net::TcpListener;
18#[cfg(any(feature = "observers", feature = "redis-rate-limiting", feature = "redis-pkce"))]
19use tracing::error;
20use tracing::{info, warn};
21#[cfg(feature = "observers")]
22use {
23 crate::observers::{ObserverRuntime, ObserverRuntimeConfig},
24 tokio::sync::RwLock,
25};
26
27use crate::{
28 Result, ServerError,
29 middleware::{
30 BearerAuthState, OidcAuthState, RateLimiter, bearer_auth_middleware, cors_layer_restricted,
31 metrics_middleware, oidc_auth_middleware, require_json_content_type, trace_layer,
32 },
33 routes::{
34 AuthPkceState, PlaygroundState, SubscriptionState, api, auth_callback, auth_start,
35 graphql::AppState, graphql_get_handler, graphql_handler, health_handler,
36 introspection_handler, metrics_handler, metrics_json_handler, playground_handler,
37 subscription_handler,
38 },
39 server_config::ServerConfig,
40 tls::TlsSetup,
41};
42
43pub struct Server<A: DatabaseAdapter> {
45 config: ServerConfig,
46 executor: Arc<Executor<A>>,
47 subscription_manager: Arc<SubscriptionManager>,
48 subscription_lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
49 max_subscriptions_per_connection: Option<u32>,
50 oidc_validator: Option<Arc<OidcValidator>>,
51 rate_limiter: Option<Arc<RateLimiter>>,
52 secrets_manager: Option<Arc<crate::secrets_manager::SecretsManager>>,
53 circuit_breaker:
54 Option<Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>>,
55 error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
56 state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
57 pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
58 oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
59 api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
60 revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
61 apq_store: Option<Arc<dyn fraiseql_core::apq::ApqStorage>>,
62 trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
63
64 #[cfg(feature = "observers")]
65 observer_runtime: Option<Arc<RwLock<ObserverRuntime>>>,
66
67 #[cfg(feature = "observers")]
68 db_pool: Option<sqlx::PgPool>,
69
70 #[cfg(feature = "arrow")]
71 flight_service: Option<FraiseQLFlightService>,
72
73 #[cfg(feature = "mcp")]
74 mcp_config: Option<crate::mcp::McpConfig>,
75}
76
77impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
78 fn state_encryption_from_schema(
86 schema: &CompiledSchema,
87 ) -> crate::Result<Option<Arc<crate::auth::state_encryption::StateEncryptionService>>> {
88 match schema.security.as_ref() {
89 None => Ok(None),
90 Some(s) => {
91 crate::auth::state_encryption::StateEncryptionService::from_compiled_schema(s)
92 .map_err(|e| ServerError::ConfigError(e.to_string()))
93 },
94 }
95 }
96
97 async fn pkce_store_from_schema(
103 schema: &CompiledSchema,
104 state_encryption: Option<&Arc<crate::auth::state_encryption::StateEncryptionService>>,
105 ) -> Option<Arc<crate::auth::PkceStateStore>> {
106 let security = schema.security.as_ref()?;
107 let pkce_cfg = security.get("pkce")?;
108
109 #[derive(serde::Deserialize)]
110 struct PkceCfgMinimal {
111 #[serde(default)]
112 enabled: bool,
113 #[serde(default = "default_ttl")]
114 state_ttl_secs: u64,
115 #[serde(default = "default_method")]
116 code_challenge_method: String,
117 redis_url: Option<String>,
118 }
119 fn default_ttl() -> u64 { 600 }
120 fn default_method() -> String { "S256".into() }
121
122 let cfg: PkceCfgMinimal = serde_json::from_value(pkce_cfg.clone()).ok()?;
123 if !cfg.enabled {
124 return None;
125 }
126
127 if state_encryption.is_none() {
128 warn!(
129 "pkce.enabled = true but state_encryption is disabled. \
130 PKCE state tokens are sent to the OIDC provider unencrypted. \
131 Enable [security.state_encryption] in production for full protection."
132 );
133 }
134
135 if cfg.code_challenge_method.eq_ignore_ascii_case("plain") {
136 warn!(
137 "pkce.code_challenge_method = \"plain\" is insecure. \
138 Use \"S256\" in all production environments."
139 );
140 }
141
142 let enc = state_encryption.cloned();
143
144 #[cfg(feature = "redis-pkce")]
146 if let Some(ref url) = cfg.redis_url {
147 match crate::auth::PkceStateStore::new_redis(url, cfg.state_ttl_secs, enc.clone())
148 .await
149 {
150 Ok(store) => {
151 info!(redis_url = %url, "PKCE state store: Redis backend");
152 return Some(Arc::new(store));
153 }
154 Err(e) => {
155 error!(
156 error = %e,
157 redis_url = %url,
158 "Failed to connect to Redis PKCE store — falling back to in-memory"
159 );
160 }
161 }
162 }
163
164 #[cfg(not(feature = "redis-pkce"))]
165 if cfg.redis_url.is_some() {
166 warn!(
167 "pkce.redis_url is set but the `redis-pkce` Cargo feature is not compiled in. \
168 Rebuild with `--features redis-pkce` to enable the Redis PKCE backend. \
169 Falling back to in-memory storage."
170 );
171 }
172
173 warn!(
174 "PKCE state store: in-memory. In a multi-replica deployment, auth flows will fail \
175 if /auth/start and /auth/callback hit different replicas. \
176 Set [security.pkce] redis_url to enable the Redis backend, \
177 or FRAISEQL_REQUIRE_REDIS=1 to enforce it at startup."
178 );
179
180 Some(Arc::new(crate::auth::PkceStateStore::new(cfg.state_ttl_secs, enc)))
181 }
182
183 fn check_redis_requirement(
194 pkce_store: Option<&Arc<crate::auth::PkceStateStore>>,
195 ) -> crate::Result<()> {
196 if std::env::var("FRAISEQL_REQUIRE_REDIS").is_ok() {
197 let pkce_in_memory = pkce_store.is_some_and(|s| s.is_in_memory());
198 if pkce_in_memory {
199 return Err(ServerError::ConfigError(concat!(
200 "FraiseQL failed to start\n\n",
201 " FRAISEQL_REQUIRE_REDIS is set but PKCE auth state is using in-memory storage.\n",
202 " In a multi-replica deployment, auth callbacks can fail if they hit a\n",
203 " different replica than the one that handled /auth/start.\n\n",
204 " To fix:\n",
205 " [security.pkce]\n",
206 " redis_url = \"redis://localhost:6379\"\n\n",
207 " [security.rate_limiting]\n",
208 " redis_url = \"redis://localhost:6379\"\n\n",
209 " To allow in-memory (single-replica only):\n",
210 " Unset FRAISEQL_REQUIRE_REDIS",
211 )
212 .into()));
213 }
214 }
215 Ok(())
216 }
217
218 fn oidc_server_client_from_schema(
220 schema: &CompiledSchema,
221 ) -> Option<Arc<crate::auth::OidcServerClient>> {
222 let schema_json = serde_json::to_value(schema).ok()?;
226 crate::auth::OidcServerClient::from_compiled_schema(&schema_json)
227 }
228
229 async fn rate_limiter_from_schema(schema: &CompiledSchema) -> Option<Arc<RateLimiter>> {
237 let sec: crate::middleware::RateLimitingSecurityConfig = schema
238 .security
239 .as_ref()
240 .and_then(|s| s.get("rate_limiting"))
241 .and_then(|v| serde_json::from_value(v.clone()).ok())?;
242
243 if !sec.enabled {
244 return None;
245 }
246
247 let config = crate::middleware::RateLimitConfig::from_security_config(&sec);
248
249 let limiter: RateLimiter = if let Some(ref redis_url) = sec.redis_url {
250 #[cfg(feature = "redis-rate-limiting")]
251 {
252 match RateLimiter::new_redis(redis_url, config.clone()).await {
253 Ok(rl) => {
254 info!(
255 url = redis_url.as_str(),
256 rps_per_ip = config.rps_per_ip,
257 burst_size = config.burst_size,
258 "Rate limiting: using Redis distributed backend"
259 );
260 rl.with_path_rules_from_security(&sec)
261 },
262 Err(e) => {
263 error!(
264 error = %e,
265 "Failed to connect to Redis for rate limiting — \
266 falling back to in-memory backend"
267 );
268 RateLimiter::new(config).with_path_rules_from_security(&sec)
269 },
270 }
271 }
272 #[cfg(not(feature = "redis-rate-limiting"))]
273 {
274 let _ = redis_url;
275 warn!(
276 "rate_limiting.redis_url is set but the server was compiled without the \
277 'redis-rate-limiting' feature. Using in-memory backend."
278 );
279 RateLimiter::new(config).with_path_rules_from_security(&sec)
280 }
281 } else {
282 info!(
283 rps_per_ip = config.rps_per_ip,
284 burst_size = config.burst_size,
285 "Rate limiting: using in-memory backend"
286 );
287 RateLimiter::new(config).with_path_rules_from_security(&sec)
288 };
289
290 Some(Arc::new(limiter))
291 }
292
293 fn error_sanitizer_from_schema(
296 schema: &CompiledSchema,
297 ) -> Arc<crate::config::error_sanitization::ErrorSanitizer> {
298 let sanitizer = schema
299 .security
300 .as_ref()
301 .and_then(|s| s.get("error_sanitization"))
302 .and_then(|v| {
303 serde_json::from_value::<
304 crate::config::error_sanitization::ErrorSanitizationConfig,
305 >(v.clone())
306 .ok()
307 })
308 .map(crate::config::error_sanitization::ErrorSanitizer::new)
309 .unwrap_or_else(crate::config::error_sanitization::ErrorSanitizer::disabled);
310 Arc::new(sanitizer)
311 }
312
313 fn trusted_docs_from_schema(
316 schema: &CompiledSchema,
317 ) -> Option<Arc<crate::trusted_documents::TrustedDocumentStore>> {
318 let security = schema.security.as_ref()?;
319 let td_cfg = security.get("trusted_documents")?;
320
321 #[derive(serde::Deserialize)]
322 struct TdCfgMinimal {
323 #[serde(default)]
324 enabled: bool,
325 #[serde(default)]
326 mode: String,
327 manifest_path: Option<String>,
328 #[allow(dead_code)]
329 manifest_url: Option<String>,
330 #[serde(default)]
331 reload_interval_secs: u64,
332 }
333
334 let cfg: TdCfgMinimal = serde_json::from_value(td_cfg.clone()).ok()?;
335 if !cfg.enabled {
336 return None;
337 }
338
339 let mode = if cfg.mode.eq_ignore_ascii_case("strict") {
340 crate::trusted_documents::TrustedDocumentMode::Strict
341 } else {
342 crate::trusted_documents::TrustedDocumentMode::Permissive
343 };
344
345 if let Some(ref path) = cfg.manifest_path {
346 match crate::trusted_documents::TrustedDocumentStore::from_manifest_file(
347 std::path::Path::new(path),
348 mode,
349 ) {
350 Ok(store) => {
351 let store = Arc::new(store);
352 if cfg.reload_interval_secs > 0 {
354 if let Some(ref url) = cfg.manifest_url {
355 Self::spawn_trusted_docs_reload(
356 Arc::clone(&store),
357 url.clone(),
358 cfg.reload_interval_secs,
359 );
360 } else {
361 warn!(
362 "trusted_documents.reload_interval_secs > 0 but no manifest_url set \
363 — hot-reload disabled (file-based manifests must be reloaded manually)"
364 );
365 }
366 }
367 info!(
368 manifest = %path,
369 mode = ?mode,
370 "Trusted documents loaded"
371 );
372 Some(store)
373 }
374 Err(e) => {
375 tracing::error!(error = %e, "Failed to load trusted documents manifest");
376 None
377 }
378 }
379 } else {
380 warn!("trusted_documents.enabled = true but no manifest_path or manifest_url set");
381 None
382 }
383 }
384
385 fn spawn_trusted_docs_reload(
387 store: Arc<crate::trusted_documents::TrustedDocumentStore>,
388 url: String,
389 interval_secs: u64,
390 ) {
391 tokio::spawn(async move {
392 let mut ticker =
393 tokio::time::interval(std::time::Duration::from_secs(interval_secs));
394 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
395 loop {
396 ticker.tick().await;
397 match reqwest::get(&url).await {
398 Ok(resp) => match resp.text().await {
399 Ok(body) => {
400 #[derive(serde::Deserialize)]
401 struct Manifest {
402 documents: std::collections::HashMap<String, String>,
403 }
404 match serde_json::from_str::<Manifest>(&body) {
405 Ok(manifest) => {
406 let count = manifest.documents.len();
407 store.replace_documents(manifest.documents).await;
408 info!(
409 count,
410 "Trusted documents manifest reloaded"
411 );
412 }
413 Err(e) => {
414 warn!(error = %e, "Failed to parse trusted documents manifest");
415 }
416 }
417 }
418 Err(e) => {
419 warn!(error = %e, "Failed to read trusted documents manifest response");
420 }
421 },
422 Err(e) => {
423 warn!(error = %e, "Failed to fetch trusted documents manifest");
424 }
425 }
426 }
427 });
428 }
429
430 pub async fn new(
459 config: ServerConfig,
460 schema: CompiledSchema,
461 adapter: Arc<A>,
462 #[allow(unused_variables)] db_pool: Option<sqlx::PgPool>,
463 ) -> Result<Self> {
464 let circuit_breaker = schema
466 .federation
467 .as_ref()
468 .and_then(crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_schema_json);
469 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
470 let state_encryption = Self::state_encryption_from_schema(&schema)?;
471 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
472 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
473 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
474 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
475 if api_key_authenticator.is_some() {
476 info!("API key authentication enabled");
477 }
478 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
479 if revocation_manager.is_some() {
480 info!("Token revocation enabled");
481 }
482 let trusted_docs = Self::trusted_docs_from_schema(&schema);
483
484 if config.cache_enabled && !schema.has_rls_configured() {
489 warn!(
490 "Query-result caching is enabled but no Row-Level Security policies are declared \
491 in the compiled schema. Cache isolation relies on per-user WHERE clauses in cache \
492 keys. Without RLS, users with the same query and variables will receive the same \
493 cached response. This is safe for single-tenant deployments but WILL LEAK DATA \
494 between tenants in multi-tenant deployments. Declare policies in fraiseql.toml \
495 or set cache_enabled = false if you are using PostgreSQL-native RLS without \
496 FraiseQL policy injection."
497 );
498 }
499
500 let subscriptions_config_json = schema.subscriptions_config.clone();
502
503 let executor = Arc::new(Executor::new(schema.clone(), adapter));
504 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
505
506 let mut server = Self::from_executor(
507 config,
508 executor,
509 subscription_manager,
510 circuit_breaker,
511 error_sanitizer,
512 state_encryption,
513 pkce_store,
514 oidc_server_client,
515 schema_rate_limiter,
516 api_key_authenticator,
517 revocation_manager,
518 trusted_docs,
519 db_pool,
520 )
521 .await?;
522
523 #[cfg(feature = "mcp")]
525 {
526 if let Some(ref mcp_json) = server.executor.schema().mcp_config {
527 match serde_json::from_value::<crate::mcp::McpConfig>(mcp_json.clone()) {
528 Ok(cfg) if cfg.enabled => {
529 let tool_count = crate::mcp::tools::schema_to_tools(
530 server.executor.schema(),
531 &cfg,
532 ).len();
533 info!(
534 path = %cfg.path,
535 transport = %cfg.transport,
536 tools = tool_count,
537 "MCP server configured"
538 );
539 server.mcp_config = Some(cfg);
540 }
541 Ok(_) => {}
542 Err(e) => {
543 warn!(error = %e, "Invalid mcp_config in compiled schema — MCP disabled");
544 }
545 }
546 }
547 }
548
549 if server.config.apq_enabled {
551 let apq_store: Arc<dyn fraiseql_core::apq::ApqStorage> =
552 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
553 server.apq_store = Some(apq_store);
554 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
555 }
556
557 if let Some(ref subs_json) = subscriptions_config_json {
559 if let Some(max) = subs_json.get("max_subscriptions_per_connection").and_then(|v| v.as_u64()) {
560 #[allow(clippy::cast_possible_truncation)]
561 {
564 server.max_subscriptions_per_connection = Some(max as u32);
565 }
566 }
567 if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_schema_json(subs_json) {
568 server.subscription_lifecycle = Arc::new(lifecycle);
569 }
570 }
571
572 Ok(server)
573 }
574
575 #[allow(clippy::too_many_arguments)]
580 async fn from_executor(
584 config: ServerConfig,
585 executor: Arc<Executor<A>>,
586 subscription_manager: Arc<SubscriptionManager>,
587 circuit_breaker: Option<
588 Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
589 >,
590 error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
591 state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
592 pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
593 oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
594 schema_rate_limiter: Option<Arc<RateLimiter>>,
595 api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
596 revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
597 trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
598 #[allow(unused_variables)] db_pool: Option<sqlx::PgPool>,
599 ) -> Result<Self> {
600 let oidc_validator = if let Some(ref auth_config) = config.auth {
602 info!(
603 issuer = %auth_config.issuer,
604 "Initializing OIDC authentication"
605 );
606 let validator = OidcValidator::new(auth_config.clone())
607 .await
608 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
609 Some(Arc::new(validator))
610 } else {
611 None
612 };
613
614 let rate_limiter = if let Some(rl) = schema_rate_limiter {
616 Some(rl)
617 } else if let Some(ref rate_config) = config.rate_limiting {
618 if rate_config.enabled {
619 info!(
620 rps_per_ip = rate_config.rps_per_ip,
621 rps_per_user = rate_config.rps_per_user,
622 "Initializing rate limiting from server config"
623 );
624 let limiter_config = crate::middleware::RateLimitConfig {
625 enabled: true,
626 rps_per_ip: rate_config.rps_per_ip,
627 rps_per_user: rate_config.rps_per_user,
628 burst_size: rate_config.burst_size,
629 cleanup_interval_secs: rate_config.cleanup_interval_secs,
630 trust_proxy_headers: false,
631 };
632 Some(Arc::new(RateLimiter::new(limiter_config)))
633 } else {
634 info!("Rate limiting disabled by configuration");
635 None
636 }
637 } else {
638 None
639 };
640
641 #[cfg(feature = "observers")]
643 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
644
645 #[cfg(feature = "arrow")]
647 let flight_service = {
648 let mut service = FraiseQLFlightService::new();
649 if let Some(ref validator) = oidc_validator {
650 info!("Enabling OIDC authentication for Arrow Flight");
651 service.set_oidc_validator(validator.clone());
652 } else {
653 info!("Arrow Flight initialized without authentication (dev mode)");
654 }
655 Some(service)
656 };
657
658 if pkce_store.is_some() && oidc_server_client.is_none() {
660 tracing::error!(
661 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
662 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
663 Add [auth] with discovery_url, client_id, client_secret_env, and \
664 server_redirect_uri to fraiseql.toml and recompile the schema."
665 );
666 }
667
668 Self::check_redis_requirement(pkce_store.as_ref())?;
670
671 if let Some(ref store) = pkce_store {
673 use std::time::Duration;
674 use tokio::time::MissedTickBehavior;
675 let store_clone = Arc::clone(store);
676 tokio::spawn(async move {
677 let mut ticker = tokio::time::interval(Duration::from_secs(300));
678 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
679 loop {
680 ticker.tick().await;
681 store_clone.cleanup_expired().await;
682 }
683 });
684 }
685
686 Ok(Self {
687 config,
688 executor,
689 subscription_manager,
690 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
691 max_subscriptions_per_connection: None,
692 oidc_validator,
693 rate_limiter,
694 secrets_manager: None,
695 circuit_breaker,
696 error_sanitizer,
697 state_encryption,
698 pkce_store,
699 oidc_server_client,
700 api_key_authenticator,
701 revocation_manager,
702 apq_store: None,
703 trusted_docs,
704 #[cfg(feature = "observers")]
705 observer_runtime,
706 #[cfg(feature = "observers")]
707 db_pool,
708 #[cfg(feature = "arrow")]
709 flight_service,
710 #[cfg(feature = "mcp")]
711 mcp_config: None,
712 })
713 }
714
715 #[must_use]
717 pub fn with_subscription_lifecycle(
718 mut self,
719 lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
720 ) -> Self {
721 self.subscription_lifecycle = lifecycle;
722 self
723 }
724
725 #[must_use]
727 pub fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
728 self.max_subscriptions_per_connection = Some(max);
729 self
730 }
731
732 pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
736 self.secrets_manager = Some(manager);
737 info!("Secrets manager attached to server");
738 }
739
740 #[cfg(feature = "mcp")]
750 pub async fn serve_mcp_stdio(self) -> Result<()> {
751 let mcp_cfg = self.mcp_config.ok_or_else(|| {
752 ServerError::ConfigError(
753 "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
754 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
755 .into(),
756 )
757 })?;
758
759 let schema = Arc::new(self.executor.schema().clone());
760 let executor = self.executor.clone();
761
762 let service = crate::mcp::handler::FraiseQLMcpService::new(
763 schema,
764 executor,
765 mcp_cfg,
766 );
767
768 info!("MCP stdio transport starting — reading from stdin, writing to stdout");
769
770 use rmcp::ServiceExt;
771 let running = service
772 .serve((tokio::io::stdin(), tokio::io::stdout()))
773 .await
774 .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
775
776 running.waiting().await
777 .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
778
779 Ok(())
780 }
781}
782
783impl<A: DatabaseAdapter + RelayDatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
784 pub async fn with_relay_pagination(
812 config: ServerConfig,
813 schema: CompiledSchema,
814 adapter: Arc<A>,
815 db_pool: Option<sqlx::PgPool>,
816 ) -> Result<Self> {
817 let circuit_breaker = schema
819 .federation
820 .as_ref()
821 .and_then(crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_schema_json);
822 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
823 let state_encryption = Self::state_encryption_from_schema(&schema)?;
824 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
825 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
826 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
827 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
828 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
829 let trusted_docs = Self::trusted_docs_from_schema(&schema);
830
831 let executor = Arc::new(Executor::new_with_relay(schema.clone(), adapter));
832 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
833
834 let mut server = Self::from_executor(
835 config,
836 executor,
837 subscription_manager,
838 circuit_breaker,
839 error_sanitizer,
840 state_encryption,
841 pkce_store,
842 oidc_server_client,
843 schema_rate_limiter,
844 api_key_authenticator,
845 revocation_manager,
846 trusted_docs,
847 db_pool,
848 )
849 .await?;
850
851 #[cfg(feature = "mcp")]
853 {
854 if let Some(ref mcp_json) = server.executor.schema().mcp_config {
855 match serde_json::from_value::<crate::mcp::McpConfig>(mcp_json.clone()) {
856 Ok(cfg) if cfg.enabled => {
857 let tool_count = crate::mcp::tools::schema_to_tools(
858 server.executor.schema(),
859 &cfg,
860 ).len();
861 info!(
862 path = %cfg.path,
863 transport = %cfg.transport,
864 tools = tool_count,
865 "MCP server configured"
866 );
867 server.mcp_config = Some(cfg);
868 }
869 Ok(_) => {}
870 Err(e) => {
871 warn!(error = %e, "Invalid mcp_config in compiled schema — MCP disabled");
872 }
873 }
874 }
875 }
876
877 if server.config.apq_enabled {
879 let apq_store: Arc<dyn fraiseql_core::apq::ApqStorage> =
880 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
881 server.apq_store = Some(apq_store);
882 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
883 }
884
885 Ok(server)
886 }
887}
888
889impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
890 #[cfg(feature = "arrow")]
906 pub async fn with_flight_service(
907 config: ServerConfig,
908 schema: CompiledSchema,
909 adapter: Arc<A>,
910 #[allow(unused_variables)] db_pool: Option<sqlx::PgPool>,
911 flight_service: Option<FraiseQLFlightService>,
912 ) -> Result<Self> {
913 let circuit_breaker = schema
915 .federation
916 .as_ref()
917 .and_then(crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_schema_json);
918 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
919 let state_encryption = Self::state_encryption_from_schema(&schema)?;
920 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
921 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
922 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
923 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
924 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
925 let trusted_docs = Self::trusted_docs_from_schema(&schema);
926
927 let executor = Arc::new(Executor::new(schema.clone(), adapter));
928 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
929
930 let oidc_validator = if let Some(ref auth_config) = config.auth {
932 info!(
933 issuer = %auth_config.issuer,
934 "Initializing OIDC authentication"
935 );
936 let validator = OidcValidator::new(auth_config.clone())
937 .await
938 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
939 Some(Arc::new(validator))
940 } else {
941 None
942 };
943
944 let rate_limiter = if let Some(rl) = schema_rate_limiter {
946 Some(rl)
947 } else if let Some(ref rate_config) = config.rate_limiting {
948 if rate_config.enabled {
949 info!(
950 rps_per_ip = rate_config.rps_per_ip,
951 rps_per_user = rate_config.rps_per_user,
952 "Initializing rate limiting from server config"
953 );
954 let limiter_config = crate::middleware::RateLimitConfig {
955 enabled: true,
956 rps_per_ip: rate_config.rps_per_ip,
957 rps_per_user: rate_config.rps_per_user,
958 burst_size: rate_config.burst_size,
959 cleanup_interval_secs: rate_config.cleanup_interval_secs,
960 trust_proxy_headers: false,
961 };
962 Some(Arc::new(RateLimiter::new(limiter_config)))
963 } else {
964 info!("Rate limiting disabled by configuration");
965 None
966 }
967 } else {
968 None
969 };
970
971 #[cfg(feature = "observers")]
973 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
974
975 if pkce_store.is_some() && oidc_server_client.is_none() {
977 tracing::error!(
978 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
979 Auth routes will NOT be mounted."
980 );
981 }
982
983 Self::check_redis_requirement(pkce_store.as_ref())?;
985
986 if let Some(ref store) = pkce_store {
988 use std::time::Duration;
989 use tokio::time::MissedTickBehavior;
990 let store_clone = Arc::clone(store);
991 tokio::spawn(async move {
992 let mut ticker = tokio::time::interval(Duration::from_secs(300));
993 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
994 loop {
995 ticker.tick().await;
996 store_clone.cleanup_expired().await;
997 }
998 });
999 }
1000
1001 let apq_enabled = config.apq_enabled;
1002
1003 Ok(Self {
1004 config,
1005 executor,
1006 subscription_manager,
1007 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
1008 max_subscriptions_per_connection: None,
1009 oidc_validator,
1010 rate_limiter,
1011 secrets_manager: None,
1012 circuit_breaker,
1013 error_sanitizer,
1014 state_encryption,
1015 pkce_store,
1016 oidc_server_client,
1017 api_key_authenticator,
1018 revocation_manager,
1019 apq_store: if apq_enabled {
1020 Some(Arc::new(fraiseql_core::apq::InMemoryApqStorage::default())
1021 as Arc<dyn fraiseql_core::apq::ApqStorage>)
1022 } else {
1023 None
1024 },
1025 trusted_docs,
1026 mcp_config: None,
1027 #[cfg(feature = "observers")]
1028 observer_runtime,
1029 #[cfg(feature = "observers")]
1030 db_pool,
1031 flight_service,
1032 })
1033 }
1034
1035 #[cfg(feature = "observers")]
1037 async fn init_observer_runtime(
1038 config: &ServerConfig,
1039 pool: Option<&sqlx::PgPool>,
1040 ) -> Option<Arc<RwLock<ObserverRuntime>>> {
1041 let observer_config = match &config.observers {
1043 Some(cfg) if cfg.enabled => cfg,
1044 _ => {
1045 info!("Observer runtime disabled");
1046 return None;
1047 },
1048 };
1049
1050 let pool = match pool {
1051 Some(p) => p,
1052 None => {
1053 warn!("No database pool provided for observers");
1054 return None;
1055 },
1056 };
1057
1058 info!("Initializing observer runtime");
1059
1060 let runtime_config = ObserverRuntimeConfig::new(pool.clone())
1061 .with_poll_interval(observer_config.poll_interval_ms)
1062 .with_batch_size(observer_config.batch_size)
1063 .with_channel_capacity(observer_config.channel_capacity);
1064
1065 let runtime = ObserverRuntime::new(runtime_config);
1066 Some(Arc::new(RwLock::new(runtime)))
1067 }
1068
1069 fn build_router(&self) -> Router {
1071 let mut state = AppState::new(self.executor.clone());
1072
1073 if let Some(ref secrets_manager) = self.secrets_manager {
1075 state = state.with_secrets_manager(secrets_manager.clone());
1076 info!("SecretsManager attached to AppState");
1077 }
1078
1079 if let Some(ref cb) = self.circuit_breaker {
1081 state = state.with_circuit_breaker(cb.clone());
1082 info!("Federation circuit breaker attached to AppState");
1083 }
1084
1085 state = state.with_error_sanitizer(self.error_sanitizer.clone());
1087 if self.error_sanitizer.is_enabled() {
1088 info!("Error sanitizer enabled — internal error details will be stripped from responses");
1089 }
1090
1091 if let Some(ref api_key_auth) = self.api_key_authenticator {
1093 state = state.with_api_key_authenticator(api_key_auth.clone());
1094 info!("API key authenticator attached to AppState");
1095 }
1096
1097 match &self.state_encryption {
1099 Some(svc) => {
1100 state = state.with_state_encryption(svc.clone());
1101 info!("State encryption: enabled");
1102 },
1103 None => {
1104 info!("State encryption: disabled (no key configured)");
1105 },
1106 }
1107
1108 let mut validator = crate::validation::RequestValidator::new();
1110 if let Some(ref vc) = self.executor.schema().validation_config {
1111 if let Some(depth) = vc.get("max_query_depth").and_then(serde_json::Value::as_u64) {
1112 validator = validator.with_max_depth(depth as usize);
1113 info!(max_query_depth = depth, "Custom query depth limit configured");
1114 }
1115 if let Some(complexity) = vc.get("max_query_complexity").and_then(serde_json::Value::as_u64) {
1116 validator = validator.with_max_complexity(complexity as usize);
1117 info!(max_query_complexity = complexity, "Custom query complexity limit configured");
1118 }
1119 }
1120 state = state.with_validator(validator);
1121
1122 state.debug_config.clone_from(&self.executor.schema().debug_config);
1124
1125 if let Some(ref store) = self.apq_store {
1127 state = state.with_apq_store(store.clone());
1128 }
1129
1130 if let Some(ref store) = self.trusted_docs {
1132 state = state.with_trusted_docs(store.clone());
1133 }
1134
1135 let metrics = state.metrics.clone();
1136
1137 let graphql_router = if let Some(ref validator) = self.oidc_validator {
1140 info!(
1141 graphql_path = %self.config.graphql_path,
1142 "GraphQL endpoint protected by OIDC authentication (GET and POST)"
1143 );
1144 let auth_state = OidcAuthState::new(validator.clone());
1145 let router = Router::new()
1146 .route(
1147 &self.config.graphql_path,
1148 get(graphql_get_handler::<A>).post(graphql_handler::<A>),
1149 )
1150 .route_layer(middleware::from_fn_with_state(auth_state, oidc_auth_middleware));
1151
1152 if self.config.require_json_content_type {
1153 router
1154 .route_layer(middleware::from_fn(require_json_content_type))
1155 .with_state(state.clone())
1156 } else {
1157 router.with_state(state.clone())
1158 }
1159 } else {
1160 let router = Router::new()
1161 .route(
1162 &self.config.graphql_path,
1163 get(graphql_get_handler::<A>).post(graphql_handler::<A>),
1164 );
1165
1166 if self.config.require_json_content_type {
1167 router
1168 .route_layer(middleware::from_fn(require_json_content_type))
1169 .with_state(state.clone())
1170 } else {
1171 router.with_state(state.clone())
1172 }
1173 };
1174
1175 let mut app = Router::new()
1177 .route(&self.config.health_path, get(health_handler::<A>))
1178 .with_state(state.clone())
1179 .merge(graphql_router);
1180
1181 if self.config.playground_enabled {
1183 let playground_state =
1184 PlaygroundState::new(self.config.graphql_path.clone(), self.config.playground_tool);
1185 info!(
1186 playground_path = %self.config.playground_path,
1187 playground_tool = ?self.config.playground_tool,
1188 "GraphQL playground enabled"
1189 );
1190 let playground_router = Router::new()
1191 .route(&self.config.playground_path, get(playground_handler))
1192 .with_state(playground_state);
1193 app = app.merge(playground_router);
1194 }
1195
1196 if self.config.subscriptions_enabled {
1198 let subscription_state = SubscriptionState::new(self.subscription_manager.clone())
1199 .with_lifecycle(self.subscription_lifecycle.clone())
1200 .with_max_subscriptions(self.max_subscriptions_per_connection);
1201 info!(
1202 subscription_path = %self.config.subscription_path,
1203 "GraphQL subscriptions enabled (graphql-transport-ws + graphql-ws protocols)"
1204 );
1205 let subscription_router = Router::new()
1206 .route(&self.config.subscription_path, get(subscription_handler))
1207 .with_state(subscription_state);
1208 app = app.merge(subscription_router);
1209 }
1210
1211 if self.config.introspection_enabled {
1213 if self.config.introspection_require_auth {
1214 if let Some(ref validator) = self.oidc_validator {
1215 info!(
1216 introspection_path = %self.config.introspection_path,
1217 "Introspection endpoint enabled (OIDC auth required)"
1218 );
1219 let auth_state = OidcAuthState::new(validator.clone());
1220 let introspection_router = Router::new()
1221 .route(&self.config.introspection_path, get(introspection_handler::<A>))
1222 .route_layer(middleware::from_fn_with_state(
1223 auth_state.clone(),
1224 oidc_auth_middleware,
1225 ))
1226 .with_state(state.clone());
1227 app = app.merge(introspection_router);
1228
1229 let schema_router = Router::new()
1231 .route("/api/v1/schema.graphql", get(api::schema::export_sdl_handler::<A>))
1232 .route("/api/v1/schema.json", get(api::schema::export_json_handler::<A>))
1233 .route_layer(middleware::from_fn_with_state(
1234 auth_state,
1235 oidc_auth_middleware,
1236 ))
1237 .with_state(state.clone());
1238 app = app.merge(schema_router);
1239 } else {
1240 warn!(
1241 "introspection_require_auth is true but no OIDC configured - introspection and schema export disabled"
1242 );
1243 }
1244 } else {
1245 info!(
1246 introspection_path = %self.config.introspection_path,
1247 "Introspection endpoint enabled (no auth required - USE ONLY IN DEVELOPMENT)"
1248 );
1249 let introspection_router = Router::new()
1250 .route(&self.config.introspection_path, get(introspection_handler::<A>))
1251 .with_state(state.clone());
1252 app = app.merge(introspection_router);
1253
1254 let schema_router = Router::new()
1257 .route("/api/v1/schema.graphql", get(api::schema::export_sdl_handler::<A>))
1258 .route("/api/v1/schema.json", get(api::schema::export_json_handler::<A>))
1259 .with_state(state.clone());
1260 app = app.merge(schema_router);
1261 }
1262 }
1263
1264 if self.config.metrics_enabled {
1266 if let Some(ref token) = self.config.metrics_token {
1267 info!(
1268 metrics_path = %self.config.metrics_path,
1269 metrics_json_path = %self.config.metrics_json_path,
1270 "Metrics endpoints enabled (bearer token required)"
1271 );
1272
1273 let auth_state = BearerAuthState::new(token.clone());
1274
1275 let metrics_router = Router::new()
1278 .route(&self.config.metrics_path, get(metrics_handler::<A>))
1279 .route(&self.config.metrics_json_path, get(metrics_json_handler::<A>))
1280 .route_layer(middleware::from_fn_with_state(auth_state, bearer_auth_middleware))
1281 .with_state(state.clone());
1282
1283 app = app.merge(metrics_router);
1284 } else {
1285 warn!(
1286 "metrics_enabled is true but metrics_token is not set - metrics endpoints disabled"
1287 );
1288 }
1289 }
1290
1291 if self.config.admin_api_enabled {
1293 if let Some(ref token) = self.config.admin_token {
1294 info!("Admin API endpoints enabled (bearer token required)");
1295
1296 let auth_state = BearerAuthState::new(token.clone());
1297
1298 let admin_router = Router::new()
1300 .route(
1301 "/api/v1/admin/reload-schema",
1302 post(api::admin::reload_schema_handler::<A>),
1303 )
1304 .route("/api/v1/admin/cache/clear", post(api::admin::cache_clear_handler::<A>))
1305 .route("/api/v1/admin/cache/stats", get(api::admin::cache_stats_handler::<A>))
1306 .route("/api/v1/admin/config", get(api::admin::config_handler::<A>))
1307 .route_layer(middleware::from_fn_with_state(auth_state, bearer_auth_middleware))
1308 .with_state(state.clone());
1309
1310 app = app.merge(admin_router);
1311 } else {
1312 warn!(
1313 "admin_api_enabled is true but admin_token is not set - admin endpoints disabled"
1314 );
1315 }
1316 }
1317
1318 if self.config.design_api_require_auth {
1320 if let Some(ref validator) = self.oidc_validator {
1321 info!("Design audit API endpoints enabled (OIDC auth required)");
1322 let auth_state = OidcAuthState::new(validator.clone());
1323 let design_router = Router::new()
1324 .route(
1325 "/design/federation-audit",
1326 post(api::design::federation_audit_handler::<A>),
1327 )
1328 .route("/design/cost-audit", post(api::design::cost_audit_handler::<A>))
1329 .route("/design/cache-audit", post(api::design::cache_audit_handler::<A>))
1330 .route("/design/auth-audit", post(api::design::auth_audit_handler::<A>))
1331 .route(
1332 "/design/compilation-audit",
1333 post(api::design::compilation_audit_handler::<A>),
1334 )
1335 .route("/design/audit", post(api::design::overall_design_audit_handler::<A>))
1336 .route_layer(middleware::from_fn_with_state(auth_state, oidc_auth_middleware))
1337 .with_state(state.clone());
1338 app = app.nest("/api/v1", design_router);
1339 } else {
1340 warn!(
1341 "design_api_require_auth is true but no OIDC configured - design endpoints unprotected"
1342 );
1343 let design_router = Router::new()
1345 .route(
1346 "/design/federation-audit",
1347 post(api::design::federation_audit_handler::<A>),
1348 )
1349 .route("/design/cost-audit", post(api::design::cost_audit_handler::<A>))
1350 .route("/design/cache-audit", post(api::design::cache_audit_handler::<A>))
1351 .route("/design/auth-audit", post(api::design::auth_audit_handler::<A>))
1352 .route(
1353 "/design/compilation-audit",
1354 post(api::design::compilation_audit_handler::<A>),
1355 )
1356 .route("/design/audit", post(api::design::overall_design_audit_handler::<A>))
1357 .with_state(state.clone());
1358 app = app.nest("/api/v1", design_router);
1359 }
1360 } else {
1361 info!("Design audit API endpoints enabled (no auth required)");
1362 let design_router = Router::new()
1363 .route("/design/federation-audit", post(api::design::federation_audit_handler::<A>))
1364 .route("/design/cost-audit", post(api::design::cost_audit_handler::<A>))
1365 .route("/design/cache-audit", post(api::design::cache_audit_handler::<A>))
1366 .route("/design/auth-audit", post(api::design::auth_audit_handler::<A>))
1367 .route(
1368 "/design/compilation-audit",
1369 post(api::design::compilation_audit_handler::<A>),
1370 )
1371 .route("/design/audit", post(api::design::overall_design_audit_handler::<A>))
1372 .with_state(state.clone());
1373 app = app.nest("/api/v1", design_router);
1374 }
1375
1376 if let (Some(store), Some(client)) = (&self.pkce_store, &self.oidc_server_client) {
1378 let auth_state = Arc::new(AuthPkceState {
1379 pkce_store: Arc::clone(store),
1380 oidc_client: Arc::clone(client),
1381 http_client: Arc::new(reqwest::Client::new()),
1382 post_login_redirect_uri: None,
1383 });
1384 let auth_router = Router::new()
1385 .route("/auth/start", get(auth_start))
1386 .route("/auth/callback", get(auth_callback))
1387 .with_state(auth_state);
1388 app = app.merge(auth_router);
1389 info!("PKCE auth routes mounted: GET /auth/start, GET /auth/callback");
1390 }
1391
1392 if let Some(ref rev_mgr) = self.revocation_manager {
1394 let rev_state = Arc::new(crate::routes::RevocationRouteState {
1395 revocation_manager: Arc::clone(rev_mgr),
1396 });
1397 let rev_router = Router::new()
1398 .route("/auth/revoke", post(crate::routes::revoke_token))
1399 .route("/auth/revoke-all", post(crate::routes::revoke_all_tokens))
1400 .with_state(rev_state);
1401 app = app.merge(rev_router);
1402 info!("Token revocation routes mounted: POST /auth/revoke, POST /auth/revoke-all");
1403 }
1404
1405 #[cfg(feature = "mcp")]
1408 if let Some(ref mcp_cfg) = self.mcp_config {
1409 if mcp_cfg.transport == "http" || mcp_cfg.transport == "both" {
1410 use rmcp::transport::{StreamableHttpServerConfig, StreamableHttpService};
1411 use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
1412
1413 let schema = Arc::new(self.executor.schema().clone());
1414 let executor = self.executor.clone();
1415 let cfg = mcp_cfg.clone();
1416 let mcp_service = StreamableHttpService::new(
1417 move || {
1418 Ok(crate::mcp::handler::FraiseQLMcpService::new(
1419 schema.clone(),
1420 executor.clone(),
1421 cfg.clone(),
1422 ))
1423 },
1424 Arc::new(LocalSessionManager::default()),
1425 StreamableHttpServerConfig::default(),
1426 );
1427 app = app.nest_service(&mcp_cfg.path, mcp_service);
1428 info!(path = %mcp_cfg.path, "MCP HTTP endpoint mounted");
1429 }
1430 }
1431
1432 let api_router = api::routes(state.clone());
1434 app = app.nest("/api/v1", api_router);
1435
1436 #[cfg(feature = "observers")]
1438 if let Some(ref db_pool) = self.db_pool {
1439 info!("Adding RBAC Management API endpoints");
1440 let rbac_backend = Arc::new(
1441 crate::api::rbac_management::db_backend::RbacDbBackend::new(db_pool.clone()),
1442 );
1443 let rbac_state = crate::api::RbacManagementState { db: rbac_backend };
1444 let rbac_router = crate::api::rbac_management_router(rbac_state);
1445 app = app.merge(rbac_router);
1446 }
1447
1448 app = app.layer(middleware::from_fn_with_state(metrics, metrics_middleware));
1451
1452 #[cfg(feature = "observers")]
1454 {
1455 app = self.add_observer_routes(app);
1456 }
1457
1458 if self.config.tracing_enabled {
1460 app = app.layer(trace_layer());
1461 }
1462
1463 if self.config.cors_enabled {
1464 let origins = if self.config.cors_origins.is_empty() {
1466 tracing::warn!(
1468 "CORS enabled but no origins configured. Using localhost:3000 as default. \
1469 Set cors_origins in config for production."
1470 );
1471 vec!["http://localhost:3000".to_string()]
1472 } else {
1473 self.config.cors_origins.clone()
1474 };
1475 app = app.layer(cors_layer_restricted(origins));
1476 }
1477
1478 if self.config.max_request_body_bytes > 0 {
1480 info!(
1481 max_bytes = self.config.max_request_body_bytes,
1482 "Request body size limit enabled"
1483 );
1484 app = app.layer(DefaultBodyLimit::max(self.config.max_request_body_bytes));
1485 }
1486
1487 if let Some(ref limiter) = self.rate_limiter {
1489 use std::net::SocketAddr;
1490
1491 use axum::extract::ConnectInfo;
1492
1493 info!("Enabling rate limiting middleware");
1494 let limiter_clone = limiter.clone();
1495 app = app.layer(middleware::from_fn(move |ConnectInfo(addr): ConnectInfo<SocketAddr>, req, next: axum::middleware::Next| {
1496 let limiter = limiter_clone.clone();
1497 async move {
1498 let ip = addr.ip().to_string();
1499
1500 let check = limiter.check_ip_limit(&ip).await;
1502 if !check.allowed {
1503 warn!(ip = %ip, "IP rate limit exceeded");
1504 use axum::http::StatusCode;
1505 use axum::response::IntoResponse;
1506 let retry = check.retry_after_secs;
1507 let retry_str = retry.to_string();
1508 let body = format!(
1509 r#"{{"errors":[{{"message":"Rate limit exceeded. Please retry after {retry} second{s}."}}]}}"#,
1510 s = if retry == 1 { "" } else { "s" }
1511 );
1512 return (
1513 StatusCode::TOO_MANY_REQUESTS,
1514 [("Content-Type", "application/json"), ("Retry-After", retry_str.as_str())],
1515 body,
1516 ).into_response();
1517 }
1518
1519 let remaining = check.remaining;
1521 let mut response = next.run(req).await;
1522
1523 let headers = response.headers_mut();
1525 if let Ok(limit_value) = format!("{}", limiter.config().rps_per_ip).parse() {
1526 headers.insert("X-RateLimit-Limit", limit_value);
1527 }
1528 if let Ok(remaining_value) = format!("{}", remaining as u32).parse() {
1529 headers.insert("X-RateLimit-Remaining", remaining_value);
1530 }
1531
1532 response
1533 }
1534 }));
1535 }
1536
1537 app
1538 }
1539
1540 #[cfg(feature = "observers")]
1542 fn add_observer_routes(&self, app: Router) -> Router {
1543 use crate::observers::{
1544 ObserverRepository, ObserverState, RuntimeHealthState, observer_routes,
1545 observer_runtime_routes,
1546 };
1547
1548 let observer_state = ObserverState {
1550 repository: ObserverRepository::new(
1551 self.db_pool.clone().expect("Pool required for observers"),
1552 ),
1553 };
1554
1555 let app = app.nest("/api/observers", observer_routes(observer_state));
1556
1557 if let Some(ref runtime) = self.observer_runtime {
1559 info!(
1560 path = "/api/observers",
1561 "Observer management and runtime health endpoints enabled"
1562 );
1563
1564 let runtime_state = RuntimeHealthState {
1565 runtime: runtime.clone(),
1566 };
1567
1568 app.merge(observer_runtime_routes(runtime_state))
1569 } else {
1570 app
1571 }
1572 }
1573
1574 pub async fn serve(self) -> Result<()> {
1580 self.serve_with_shutdown(Self::shutdown_signal()).await
1581 }
1582
1583 pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<()>
1592 where
1593 F: std::future::Future<Output = ()> + Send + 'static,
1594 {
1595 let app = self.build_router();
1596
1597 let tls_setup = TlsSetup::new(self.config.tls.clone(), self.config.database_tls.clone())?;
1599
1600 info!(
1601 bind_addr = %self.config.bind_addr,
1602 graphql_path = %self.config.graphql_path,
1603 tls_enabled = tls_setup.is_tls_enabled(),
1604 "Starting FraiseQL server"
1605 );
1606
1607 #[cfg(feature = "observers")]
1609 if let Some(ref runtime) = self.observer_runtime {
1610 info!("Starting observer runtime...");
1611 let mut guard = runtime.write().await;
1612
1613 match guard.start().await {
1614 Ok(()) => info!("Observer runtime started"),
1615 Err(e) => {
1616 error!("Failed to start observer runtime: {}", e);
1617 warn!("Server will continue without observers");
1618 },
1619 }
1620 drop(guard);
1621 }
1622
1623 let listener = TcpListener::bind(self.config.bind_addr)
1624 .await
1625 .map_err(|e| ServerError::BindError(e.to_string()))?;
1626
1627 if tls_setup.is_tls_enabled() {
1629 let _ = tls_setup.create_rustls_config()?;
1631 info!(
1632 cert_path = ?tls_setup.cert_path(),
1633 key_path = ?tls_setup.key_path(),
1634 mtls_required = tls_setup.is_mtls_required(),
1635 "Server TLS configuration loaded (note: use reverse proxy for server-side TLS termination)"
1636 );
1637 }
1638
1639 info!(
1641 postgres_ssl_mode = tls_setup.postgres_ssl_mode(),
1642 redis_ssl = tls_setup.redis_ssl_enabled(),
1643 clickhouse_https = tls_setup.clickhouse_https_enabled(),
1644 elasticsearch_https = tls_setup.elasticsearch_https_enabled(),
1645 "Database connection TLS configuration applied"
1646 );
1647
1648 info!("Server listening on http://{}", self.config.bind_addr);
1649
1650 #[cfg(feature = "arrow")]
1652 if let Some(flight_service) = self.flight_service {
1653 let flight_addr = "0.0.0.0:50051".parse().expect("Valid Flight address");
1655 info!("Arrow Flight server listening on grpc://{}", flight_addr);
1656
1657 let flight_server = tokio::spawn(async move {
1659 tonic::transport::Server::builder()
1660 .add_service(flight_service.into_server())
1661 .serve(flight_addr)
1662 .await
1663 });
1664
1665 #[cfg(feature = "observers")]
1667 let observer_runtime = self.observer_runtime.clone();
1668
1669 let shutdown_with_cleanup = async move {
1670 shutdown.await;
1671 #[cfg(feature = "observers")]
1672 if let Some(ref runtime) = observer_runtime {
1673 info!("Shutting down observer runtime");
1674 let mut guard = runtime.write().await;
1675 if let Err(e) = guard.stop().await {
1676 #[cfg(feature = "observers")]
1677 error!("Error stopping runtime: {}", e);
1678 } else {
1679 info!("Runtime stopped cleanly");
1680 }
1681 }
1682 };
1683
1684 axum::serve(listener, app)
1686 .with_graceful_shutdown(shutdown_with_cleanup)
1687 .await
1688 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
1689
1690 flight_server.abort();
1692 }
1693
1694 #[cfg(not(feature = "arrow"))]
1696 {
1697 axum::serve(listener, app)
1698 .with_graceful_shutdown(shutdown)
1699 .await
1700 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
1701 }
1702
1703 Ok(())
1704 }
1705
1706 pub async fn shutdown_signal() {
1708 use tokio::signal;
1709
1710 let ctrl_c = async {
1711 signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
1712 };
1713
1714 #[cfg(unix)]
1715 let terminate = async {
1716 signal::unix::signal(signal::unix::SignalKind::terminate())
1717 .expect("Failed to install SIGTERM handler")
1718 .recv()
1719 .await;
1720 };
1721
1722 #[cfg(not(unix))]
1723 let terminate = std::future::pending::<()>();
1724
1725 tokio::select! {
1726 _ = ctrl_c => info!("Received Ctrl+C"),
1727 _ = terminate => info!("Received SIGTERM"),
1728 }
1729 }
1730}