1use std::sync::Arc;
4
5#[cfg(feature = "arrow")]
6use fraiseql_arrow::FraiseQLFlightService;
7use fraiseql_core::{
8 cache::{CacheConfig, CachedDatabaseAdapter, QueryResultCache},
9 db::traits::DatabaseAdapter,
10 runtime::{Executor, SubscriptionManager},
11 schema::CompiledSchema,
12 security::OidcValidator,
13};
14use tracing::{info, warn};
15
16use super::{RateLimiter, Result, Server, ServerConfig, ServerError};
17
18impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<CachedDatabaseAdapter<A>> {
19 #[allow(clippy::cognitive_complexity)] pub async fn new(
57 config: ServerConfig,
58 schema: CompiledSchema,
59 adapter: Arc<A>,
60 db_pool: Option<sqlx::PgPool>,
61 ) -> Result<Self> {
62 if schema.schema_format_version.is_none() {
65 warn!(
66 "Loaded schema has no schema_format_version (pre-v2.1 format). \
67 Re-compile with the current fraiseql-cli for version compatibility checking."
68 );
69 }
70 schema.validate_format_version().map_err(|msg| {
71 ServerError::ConfigError(format!("Incompatible compiled schema: {msg}"))
72 })?;
73
74 #[cfg(feature = "federation")]
76 let circuit_breaker = schema.federation.as_ref().and_then(
77 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
78 );
79 #[cfg(not(feature = "federation"))]
80 let circuit_breaker: Option<()> = None;
81 #[cfg(not(feature = "federation"))]
82 let _ = &schema.federation;
83 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
84 #[cfg(feature = "auth")]
85 let state_encryption = Self::state_encryption_from_schema(&schema)?;
86 #[cfg(not(feature = "auth"))]
87 let state_encryption: Option<
88 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
89 > = None;
90 #[cfg(feature = "auth")]
91 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
92 #[cfg(not(feature = "auth"))]
93 let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
94 #[cfg(feature = "auth")]
95 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
96 #[cfg(not(feature = "auth"))]
97 let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
98 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
99 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
100 if api_key_authenticator.is_some() {
101 info!("API key authentication enabled");
102 }
103 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
104 if revocation_manager.is_some() {
105 info!("Token revocation enabled");
106 }
107 let trusted_docs = Self::trusted_docs_from_schema(&schema);
108
109 if config.cache_enabled && !schema.has_rls_configured() {
114 if schema.is_multi_tenant() {
115 return Err(ServerError::ConfigError(
117 "Cache is enabled in a multi-tenant schema but no Row-Level Security \
118 policies are declared. This would allow cross-tenant cache hits and \
119 data leakage. In fraiseql.toml, either disable caching with \
120 [cache] enabled = false, declare [security.rls] policies, or set \
121 [security] multi_tenant = false to acknowledge single-tenant mode."
122 .to_string(),
123 ));
124 }
125 warn!(
127 "Query-result caching is enabled but no Row-Level Security policies are \
128 declared in the compiled schema. This is safe for single-tenant deployments. \
129 For multi-tenant deployments, declare RLS policies and set \
130 `security.multi_tenant = true` in your schema."
131 );
132 }
133
134 let cache_config = CacheConfig::from(config.cache_enabled);
136 let cache = QueryResultCache::new(cache_config);
137
138 if cache_config.enabled {
140 tracing::info!(
141 max_entries = cache_config.max_entries,
142 ttl_seconds = cache_config.ttl_seconds,
143 rls_enforcement = ?cache_config.rls_enforcement,
144 "Query result cache: active"
145 );
146 } else {
147 tracing::info!("Query result cache: disabled");
148 }
149
150 let subscriptions_config = schema.subscriptions_config.clone();
152
153 let inner = Arc::into_inner(adapter)
155 .expect("CachedDatabaseAdapter wrapping requires exclusive Arc ownership at startup");
156 let cached = CachedDatabaseAdapter::new(inner, cache, schema.content_hash())
157 .with_ttl_overrides_from_schema(&schema);
158 let executor = Arc::new(Executor::new(schema.clone(), Arc::new(cached)));
159 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
160
161 let mut server = Self::from_executor(
162 config,
163 executor,
164 subscription_manager,
165 circuit_breaker,
166 error_sanitizer,
167 state_encryption,
168 pkce_store,
169 oidc_server_client,
170 schema_rate_limiter,
171 api_key_authenticator,
172 revocation_manager,
173 trusted_docs,
174 db_pool,
175 )
176 .await?;
177
178 server.adapter_cache_enabled = cache_config.enabled;
179
180 if let Some(pt) = server.config.pool_tuning.clone() {
182 if pt.enabled {
183 server = server
184 .with_pool_tuning(pt)
185 .map_err(|e| ServerError::ConfigError(format!("pool_tuning: {e}")))?;
186 }
187 }
188
189 #[cfg(feature = "mcp")]
191 if let Some(ref cfg) = server.executor.schema().mcp_config {
192 if cfg.enabled {
193 let tool_count =
194 crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
195 info!(
196 path = %cfg.path,
197 transport = %cfg.transport,
198 tools = tool_count,
199 "MCP server configured"
200 );
201 server.mcp_config = Some(cfg.clone());
202 }
203 }
204
205 if server.config.apq_enabled {
207 let apq_store: fraiseql_core::apq::ArcApqStorage =
208 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
209 server.apq_store = Some(apq_store);
210 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
211 }
212
213 if let Some(ref subs) = subscriptions_config {
215 if let Some(max) = subs.max_subscriptions_per_connection {
216 server.max_subscriptions_per_connection = Some(max);
217 }
218 if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_config(subs) {
219 server.subscription_lifecycle = Arc::new(lifecycle);
220 }
221 }
222
223 Ok(server)
224 }
225}
226
227impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
228 #[allow(clippy::too_many_arguments)]
233 #[allow(clippy::cognitive_complexity)] pub(super) async fn from_executor(
237 config: ServerConfig,
238 executor: Arc<Executor<A>>,
239 subscription_manager: Arc<SubscriptionManager>,
240 #[cfg(feature = "federation")] circuit_breaker: Option<
241 Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
242 >,
243 #[cfg(not(feature = "federation"))] _circuit_breaker: Option<()>,
244 error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
245 state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
246 pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
247 oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
248 schema_rate_limiter: Option<Arc<RateLimiter>>,
249 api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
250 revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
251 trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
252 #[cfg_attr(not(feature = "observers"), allow(unused_variables))] db_pool: Option<
255 sqlx::PgPool,
256 >,
257 ) -> Result<Self> {
258 let oidc_validator = if let Some(ref auth_config) = config.auth {
260 info!(
261 issuer = %auth_config.issuer,
262 "Initializing OIDC authentication"
263 );
264 let validator = OidcValidator::new(auth_config.clone())
265 .await
266 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
267 Some(Arc::new(validator))
268 } else {
269 None
270 };
271
272 let rate_limiter = if let Some(rl) = schema_rate_limiter {
274 Some(rl)
275 } else if let Some(ref rate_config) = config.rate_limiting {
276 if rate_config.enabled {
277 info!(
278 rps_per_ip = rate_config.rps_per_ip,
279 rps_per_user = rate_config.rps_per_user,
280 "Initializing rate limiting from server config"
281 );
282 Some(Arc::new(RateLimiter::new(rate_config.clone())))
283 } else {
284 info!("Rate limiting disabled by configuration");
285 None
286 }
287 } else {
288 None
289 };
290
291 #[cfg(feature = "observers")]
293 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
294
295 #[cfg(feature = "arrow")]
297 let flight_service = {
298 let mut service = FraiseQLFlightService::new();
299 if let Some(ref validator) = oidc_validator {
300 info!("Enabling OIDC authentication for Arrow Flight");
301 service.set_oidc_validator(validator.clone());
302 } else {
303 info!("Arrow Flight initialized without authentication (dev mode)");
304 }
305 Some(service)
306 };
307
308 #[cfg(feature = "auth")]
310 if pkce_store.is_some() && oidc_server_client.is_none() {
311 tracing::error!(
312 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
313 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
314 Add [auth] with discovery_url, client_id, client_secret_env, and \
315 server_redirect_uri to fraiseql.toml and recompile the schema."
316 );
317 }
318
319 #[cfg(feature = "auth")]
321 Self::check_redis_requirement(pkce_store.as_ref())?;
322
323 #[cfg(feature = "auth")]
325 if let Some(ref store) = pkce_store {
326 use std::time::Duration;
327
328 use tokio::time::MissedTickBehavior;
329 let store_clone = Arc::clone(store);
330 tokio::spawn(async move {
331 let mut ticker = tokio::time::interval(Duration::from_secs(300));
332 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
333 loop {
334 ticker.tick().await;
335 store_clone.cleanup_expired().await;
336 }
337 });
338 }
339
340 #[cfg(not(feature = "auth"))]
343 let _ = (state_encryption, pkce_store, oidc_server_client);
344 Ok(Self {
345 config,
346 executor,
347 subscription_manager,
348 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
349 max_subscriptions_per_connection: None,
350 oidc_validator,
351 rate_limiter,
352 #[cfg(feature = "secrets")]
353 secrets_manager: None,
354 #[cfg(feature = "federation")]
355 circuit_breaker,
356 error_sanitizer,
357 #[cfg(feature = "auth")]
358 state_encryption,
359 #[cfg(feature = "auth")]
360 pkce_store,
361 #[cfg(feature = "auth")]
362 oidc_server_client,
363 api_key_authenticator,
364 revocation_manager,
365 apq_store: None,
366 trusted_docs,
367 #[cfg(feature = "observers")]
368 observer_runtime,
369 #[cfg(feature = "observers")]
370 db_pool,
371 #[cfg(feature = "arrow")]
372 flight_service,
373 #[cfg(feature = "mcp")]
374 mcp_config: None,
375 pool_tuning_config: None,
376 adapter_cache_enabled: false,
377 })
378 }
379
380 #[must_use]
382 pub fn with_subscription_lifecycle(
383 mut self,
384 lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
385 ) -> Self {
386 self.subscription_lifecycle = lifecycle;
387 self
388 }
389
390 #[must_use]
392 pub const fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
393 self.max_subscriptions_per_connection = Some(max);
394 self
395 }
396
397 pub fn with_pool_tuning(
406 mut self,
407 config: crate::config::pool_tuning::PoolPressureMonitorConfig,
408 ) -> std::result::Result<Self, String> {
409 config.validate()?;
410 self.pool_tuning_config = Some(config);
411 Ok(self)
412 }
413
414 #[cfg(feature = "secrets")]
418 pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
419 self.secrets_manager = Some(manager);
420 info!("Secrets manager attached to server");
421 }
422
423 #[cfg(feature = "mcp")]
433 pub async fn serve_mcp_stdio(self) -> Result<()> {
434 use rmcp::ServiceExt;
435
436 let mcp_cfg = self.mcp_config.ok_or_else(|| {
437 ServerError::ConfigError(
438 "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
439 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
440 .into(),
441 )
442 })?;
443
444 let schema = Arc::new(self.executor.schema().clone());
445 let executor = self.executor.clone();
446
447 let service = crate::mcp::handler::FraiseQLMcpService::new(schema, executor, mcp_cfg);
448
449 info!("MCP stdio transport starting — reading from stdin, writing to stdout");
450
451 let running = service
452 .serve((tokio::io::stdin(), tokio::io::stdout()))
453 .await
454 .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
455
456 running
457 .waiting()
458 .await
459 .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
460
461 Ok(())
462 }
463}