1use std::sync::Arc;
4
5#[cfg(feature = "arrow")]
6use fraiseql_arrow::FraiseQLFlightService;
7use fraiseql_core::{
8 cache::{CacheConfig, CachedDatabaseAdapter, QueryResultCache},
9 db::traits::DatabaseAdapter,
10 runtime::{Executor, SubscriptionManager},
11 schema::CompiledSchema,
12 security::{AuthConfig, AuthMiddleware, OidcValidator},
13};
14use tracing::{info, warn};
15
16use super::{RateLimiter, Result, Server, ServerConfig, ServerError};
17
18pub(super) fn build_hs256_auth(config: &ServerConfig) -> Result<Option<Arc<AuthMiddleware>>> {
20 let Some(ref hs) = config.auth_hs256 else {
21 return Ok(None);
22 };
23 let secret = hs
24 .load_secret()
25 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize HS256 auth: {e}")))?;
26 let mut auth_config = AuthConfig::with_hs256(&secret);
27 if let Some(ref iss) = hs.issuer {
28 auth_config = auth_config.with_issuer(iss);
29 }
30 if let Some(ref aud) = hs.audience {
31 auth_config = auth_config.with_audience(aud);
32 }
33 info!(
34 secret_env = %hs.secret_env,
35 issuer = ?hs.issuer,
36 audience = ?hs.audience,
37 "Initializing HS256 authentication (local validation, no network)"
38 );
39 Ok(Some(Arc::new(AuthMiddleware::from_config(auth_config))))
40}
41
42impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<CachedDatabaseAdapter<A>> {
43 #[allow(clippy::cognitive_complexity)] pub async fn new(
81 config: ServerConfig,
82 schema: CompiledSchema,
83 adapter: Arc<A>,
84 db_pool: Option<sqlx::PgPool>,
85 ) -> Result<Self> {
86 if schema.schema_format_version.is_none() {
89 warn!(
90 "Loaded schema has no schema_format_version (pre-v2.1 format). \
91 Re-compile with the current fraiseql-cli for version compatibility checking."
92 );
93 }
94 schema.validate_format_version().map_err(|msg| {
95 ServerError::ConfigError(format!("Incompatible compiled schema: {msg}"))
96 })?;
97
98 #[cfg(feature = "federation")]
100 let circuit_breaker = schema.federation.as_ref().and_then(
101 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
102 );
103 #[cfg(not(feature = "federation"))]
104 let circuit_breaker: Option<()> = None;
105 #[cfg(not(feature = "federation"))]
106 let _ = &schema.federation;
107 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
108 #[cfg(feature = "auth")]
109 let state_encryption = Self::state_encryption_from_schema(&schema)?;
110 #[cfg(not(feature = "auth"))]
111 let state_encryption: Option<
112 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
113 > = None;
114 #[cfg(feature = "auth")]
115 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
116 #[cfg(not(feature = "auth"))]
117 let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
118 #[cfg(feature = "auth")]
119 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
120 #[cfg(not(feature = "auth"))]
121 let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
122 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
123 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
124 if api_key_authenticator.is_some() {
125 info!("API key authentication enabled");
126 }
127 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
128 if revocation_manager.is_some() {
129 info!("Token revocation enabled");
130 }
131 let trusted_docs = Self::trusted_docs_from_schema(&schema);
132
133 if config.cache_enabled && !schema.has_rls_configured() {
138 if schema.is_multi_tenant() {
139 return Err(ServerError::ConfigError(
141 "Cache is enabled in a multi-tenant schema but no Row-Level Security \
142 policies are declared. This would allow cross-tenant cache hits and \
143 data leakage. In fraiseql.toml, either disable caching with \
144 [cache] enabled = false, declare [security.rls] policies, or set \
145 [security] multi_tenant = false to acknowledge single-tenant mode."
146 .to_string(),
147 ));
148 }
149 warn!(
151 "Query-result caching is enabled but no Row-Level Security policies are \
152 declared in the compiled schema. This is safe for single-tenant deployments. \
153 For multi-tenant deployments, declare RLS policies and set \
154 `security.multi_tenant = true` in your schema."
155 );
156 }
157
158 let cache_config = CacheConfig::from(config.cache_enabled);
160 let cache = QueryResultCache::new(cache_config);
161
162 if cache_config.enabled {
164 tracing::info!(
165 max_entries = cache_config.max_entries,
166 ttl_seconds = cache_config.ttl_seconds,
167 rls_enforcement = ?cache_config.rls_enforcement,
168 "Query result cache: active"
169 );
170 } else {
171 tracing::info!("Query result cache: disabled");
172 }
173
174 let subscriptions_config = schema.subscriptions_config.clone();
176
177 let inner = Arc::into_inner(adapter)
179 .expect("CachedDatabaseAdapter wrapping requires exclusive Arc ownership at startup");
180 let cached = CachedDatabaseAdapter::new(inner, cache, schema.content_hash())
181 .with_ttl_overrides_from_schema(&schema);
182 let executor = Arc::new(Executor::new(schema.clone(), Arc::new(cached)));
183 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
184
185 let mut server = Self::from_executor(
186 config,
187 executor,
188 subscription_manager,
189 circuit_breaker,
190 error_sanitizer,
191 state_encryption,
192 pkce_store,
193 oidc_server_client,
194 schema_rate_limiter,
195 api_key_authenticator,
196 revocation_manager,
197 trusted_docs,
198 db_pool,
199 )
200 .await?;
201
202 server.adapter_cache_enabled = cache_config.enabled;
203
204 if let Some(pt) = server.config.pool_tuning.clone() {
206 if pt.enabled {
207 server = server
208 .with_pool_tuning(pt)
209 .map_err(|e| ServerError::ConfigError(format!("pool_tuning: {e}")))?;
210 }
211 }
212
213 #[cfg(feature = "mcp")]
215 if let Some(ref cfg) = server.executor.schema().mcp_config {
216 if cfg.enabled {
217 let tool_count =
218 crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
219 info!(
220 path = %cfg.path,
221 transport = %cfg.transport,
222 tools = tool_count,
223 "MCP server configured"
224 );
225 server.mcp_config = Some(cfg.clone());
226 }
227 }
228
229 if server.config.apq_enabled {
231 let apq_store: fraiseql_core::apq::ArcApqStorage =
232 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
233 server.apq_store = Some(apq_store);
234 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
235 }
236
237 if let Some(ref subs) = subscriptions_config {
239 if let Some(max) = subs.max_subscriptions_per_connection {
240 server.max_subscriptions_per_connection = Some(max);
241 }
242 if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_config(subs) {
243 server.subscription_lifecycle = Arc::new(lifecycle);
244 }
245 }
246
247 Ok(server)
248 }
249}
250
251impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
252 #[allow(clippy::too_many_arguments)]
257 #[allow(clippy::cognitive_complexity)] pub(super) async fn from_executor(
261 config: ServerConfig,
262 executor: Arc<Executor<A>>,
263 subscription_manager: Arc<SubscriptionManager>,
264 #[cfg(feature = "federation")] circuit_breaker: Option<
265 Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
266 >,
267 #[cfg(not(feature = "federation"))] _circuit_breaker: Option<()>,
268 error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
269 state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
270 pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
271 oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
272 schema_rate_limiter: Option<Arc<RateLimiter>>,
273 api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
274 revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
275 trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
276 #[cfg_attr(not(feature = "observers"), allow(unused_variables))] db_pool: Option<
279 sqlx::PgPool,
280 >,
281 ) -> Result<Self> {
282 let oidc_validator = if let Some(ref auth_config) = config.auth {
284 info!(
285 issuer = %auth_config.issuer,
286 "Initializing OIDC authentication"
287 );
288 let validator = OidcValidator::new(auth_config.clone())
289 .await
290 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
291 Some(Arc::new(validator))
292 } else {
293 None
294 };
295
296 let hs256_auth = build_hs256_auth(&config)?;
298
299 let rate_limiter = if let Some(rl) = schema_rate_limiter {
301 Some(rl)
302 } else if let Some(ref rate_config) = config.rate_limiting {
303 if rate_config.enabled {
304 info!(
305 rps_per_ip = rate_config.rps_per_ip,
306 rps_per_user = rate_config.rps_per_user,
307 "Initializing rate limiting from server config"
308 );
309 Some(Arc::new(RateLimiter::new(rate_config.clone())))
310 } else {
311 info!("Rate limiting disabled by configuration");
312 None
313 }
314 } else {
315 None
316 };
317
318 #[cfg(feature = "observers")]
320 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
321
322 #[cfg(feature = "arrow")]
324 let flight_service = {
325 let mut service = FraiseQLFlightService::new();
326 if let Some(ref validator) = oidc_validator {
327 info!("Enabling OIDC authentication for Arrow Flight");
328 service.set_oidc_validator(validator.clone());
329 } else {
330 info!("Arrow Flight initialized without authentication (dev mode)");
331 }
332 Some(service)
333 };
334
335 #[cfg(feature = "auth")]
337 if pkce_store.is_some() && oidc_server_client.is_none() {
338 tracing::error!(
339 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
340 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
341 Add [auth] with discovery_url, client_id, client_secret_env, and \
342 server_redirect_uri to fraiseql.toml and recompile the schema."
343 );
344 }
345
346 #[cfg(feature = "auth")]
348 Self::check_redis_requirement(pkce_store.as_ref())?;
349
350 #[cfg(feature = "auth")]
352 if let Some(ref store) = pkce_store {
353 use std::time::Duration;
354
355 use tokio::time::MissedTickBehavior;
356 let store_clone = Arc::clone(store);
357 tokio::spawn(async move {
358 let mut ticker = tokio::time::interval(Duration::from_secs(300));
359 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
360 loop {
361 ticker.tick().await;
362 store_clone.cleanup_expired().await;
363 }
364 });
365 }
366
367 #[cfg(not(feature = "auth"))]
370 let _ = (state_encryption, pkce_store, oidc_server_client);
371 Ok(Self {
372 config,
373 executor,
374 subscription_manager,
375 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
376 max_subscriptions_per_connection: None,
377 oidc_validator,
378 hs256_auth,
379 rate_limiter,
380 #[cfg(feature = "secrets")]
381 secrets_manager: None,
382 #[cfg(feature = "federation")]
383 circuit_breaker,
384 error_sanitizer,
385 #[cfg(feature = "auth")]
386 state_encryption,
387 #[cfg(feature = "auth")]
388 pkce_store,
389 #[cfg(feature = "auth")]
390 oidc_server_client,
391 api_key_authenticator,
392 revocation_manager,
393 apq_store: None,
394 trusted_docs,
395 #[cfg(feature = "observers")]
396 observer_runtime,
397 #[cfg(feature = "observers")]
398 db_pool,
399 #[cfg(feature = "arrow")]
400 flight_service,
401 #[cfg(feature = "mcp")]
402 mcp_config: None,
403 pool_tuning_config: None,
404 adapter_cache_enabled: false,
405 })
406 }
407
408 #[must_use]
410 pub fn with_subscription_lifecycle(
411 mut self,
412 lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
413 ) -> Self {
414 self.subscription_lifecycle = lifecycle;
415 self
416 }
417
418 #[must_use]
420 pub const fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
421 self.max_subscriptions_per_connection = Some(max);
422 self
423 }
424
425 pub fn with_pool_tuning(
434 mut self,
435 config: crate::config::pool_tuning::PoolPressureMonitorConfig,
436 ) -> std::result::Result<Self, String> {
437 config.validate()?;
438 self.pool_tuning_config = Some(config);
439 Ok(self)
440 }
441
442 #[cfg(feature = "secrets")]
446 pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
447 self.secrets_manager = Some(manager);
448 info!("Secrets manager attached to server");
449 }
450
451 #[cfg(feature = "mcp")]
461 pub async fn serve_mcp_stdio(self) -> Result<()> {
462 use rmcp::ServiceExt;
463
464 let mcp_cfg = self.mcp_config.ok_or_else(|| {
465 ServerError::ConfigError(
466 "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
467 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
468 .into(),
469 )
470 })?;
471
472 let schema = Arc::new(self.executor.schema().clone());
473 let executor = self.executor.clone();
474
475 let service = crate::mcp::handler::FraiseQLMcpService::new(schema, executor, mcp_cfg);
476
477 info!("MCP stdio transport starting — reading from stdin, writing to stdout");
478
479 let running = service
480 .serve((tokio::io::stdin(), tokio::io::stdout()))
481 .await
482 .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
483
484 running
485 .waiting()
486 .await
487 .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
488
489 Ok(())
490 }
491}