1use std::sync::Arc;
4
5#[cfg(feature = "arrow")]
6use fraiseql_arrow::FraiseQLFlightService;
7use fraiseql_core::{
8 db::traits::DatabaseAdapter,
9 runtime::{Executor, SubscriptionManager},
10 schema::CompiledSchema,
11 security::OidcValidator,
12};
13use tracing::{info, warn};
14
15use super::{RateLimiter, Result, Server, ServerConfig, ServerError};
16
17impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
18 #[allow(clippy::cognitive_complexity)] pub async fn new(
50 config: ServerConfig,
51 schema: CompiledSchema,
52 adapter: Arc<A>,
53 db_pool: Option<sqlx::PgPool>,
54 ) -> Result<Self> {
55 if schema.schema_format_version.is_none() {
58 warn!(
59 "Loaded schema has no schema_format_version (pre-v2.1 format). \
60 Re-compile with the current fraiseql-cli for version compatibility checking."
61 );
62 }
63 schema.validate_format_version().map_err(|msg| {
64 ServerError::ConfigError(format!("Incompatible compiled schema: {msg}"))
65 })?;
66
67 #[cfg(feature = "federation")]
69 let circuit_breaker = schema.federation.as_ref().and_then(
70 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
71 );
72 #[cfg(not(feature = "federation"))]
73 let circuit_breaker: Option<()> = None;
74 #[cfg(not(feature = "federation"))]
75 let _ = &schema.federation;
76 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
77 #[cfg(feature = "auth")]
78 let state_encryption = Self::state_encryption_from_schema(&schema)?;
79 #[cfg(not(feature = "auth"))]
80 let state_encryption: Option<
81 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
82 > = None;
83 #[cfg(feature = "auth")]
84 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
85 #[cfg(not(feature = "auth"))]
86 let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
87 #[cfg(feature = "auth")]
88 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
89 #[cfg(not(feature = "auth"))]
90 let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
91 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
92 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
93 if api_key_authenticator.is_some() {
94 info!("API key authentication enabled");
95 }
96 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
97 if revocation_manager.is_some() {
98 info!("Token revocation enabled");
99 }
100 let trusted_docs = Self::trusted_docs_from_schema(&schema);
101
102 if config.cache_enabled && !schema.has_rls_configured() {
107 if schema.is_multi_tenant() {
108 return Err(ServerError::ConfigError(
110 "Cache is enabled in a multi-tenant schema but no Row-Level Security \
111 policies are declared. This would allow cross-tenant cache hits and \
112 data leakage. In fraiseql.toml, either disable caching with \
113 [cache] enabled = false, declare [security.rls] policies, or set \
114 [security] multi_tenant = false to acknowledge single-tenant mode."
115 .to_string(),
116 ));
117 }
118 warn!(
120 "Query-result caching is enabled but no Row-Level Security policies are \
121 declared in the compiled schema. This is safe for single-tenant deployments. \
122 For multi-tenant deployments, declare RLS policies and set \
123 `security.multi_tenant = true` in your schema."
124 );
125 }
126
127 let subscriptions_config = schema.subscriptions_config.clone();
129
130 let executor = Arc::new(Executor::new(schema.clone(), adapter));
131 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
132
133 let mut server = Self::from_executor(
134 config,
135 executor,
136 subscription_manager,
137 circuit_breaker,
138 error_sanitizer,
139 state_encryption,
140 pkce_store,
141 oidc_server_client,
142 schema_rate_limiter,
143 api_key_authenticator,
144 revocation_manager,
145 trusted_docs,
146 db_pool,
147 )
148 .await?;
149
150 if let Some(pt) = server.config.pool_tuning.clone() {
152 if pt.enabled {
153 server = server
154 .with_pool_tuning(pt)
155 .map_err(|e| ServerError::ConfigError(format!("pool_tuning: {e}")))?;
156 }
157 }
158
159 #[cfg(feature = "mcp")]
161 if let Some(ref cfg) = server.executor.schema().mcp_config {
162 if cfg.enabled {
163 let tool_count =
164 crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
165 info!(
166 path = %cfg.path,
167 transport = %cfg.transport,
168 tools = tool_count,
169 "MCP server configured"
170 );
171 server.mcp_config = Some(cfg.clone());
172 }
173 }
174
175 if server.config.apq_enabled {
177 let apq_store: fraiseql_core::apq::ArcApqStorage =
178 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
179 server.apq_store = Some(apq_store);
180 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
181 }
182
183 if let Some(ref subs) = subscriptions_config {
185 if let Some(max) = subs.max_subscriptions_per_connection {
186 server.max_subscriptions_per_connection = Some(max);
187 }
188 if let Some(lifecycle) = crate::subscriptions::WebhookLifecycle::from_config(subs) {
189 server.subscription_lifecycle = Arc::new(lifecycle);
190 }
191 }
192
193 Ok(server)
194 }
195
196 #[allow(clippy::too_many_arguments)]
201 #[allow(clippy::cognitive_complexity)] pub(super) async fn from_executor(
205 config: ServerConfig,
206 executor: Arc<Executor<A>>,
207 subscription_manager: Arc<SubscriptionManager>,
208 #[cfg(feature = "federation")] circuit_breaker: Option<
209 Arc<crate::federation::circuit_breaker::FederationCircuitBreakerManager>,
210 >,
211 #[cfg(not(feature = "federation"))] _circuit_breaker: Option<()>,
212 error_sanitizer: Arc<crate::config::error_sanitization::ErrorSanitizer>,
213 state_encryption: Option<Arc<crate::auth::state_encryption::StateEncryptionService>>,
214 pkce_store: Option<Arc<crate::auth::PkceStateStore>>,
215 oidc_server_client: Option<Arc<crate::auth::OidcServerClient>>,
216 schema_rate_limiter: Option<Arc<RateLimiter>>,
217 api_key_authenticator: Option<Arc<crate::api_key::ApiKeyAuthenticator>>,
218 revocation_manager: Option<Arc<crate::token_revocation::TokenRevocationManager>>,
219 trusted_docs: Option<Arc<crate::trusted_documents::TrustedDocumentStore>>,
220 #[cfg_attr(not(feature = "observers"), allow(unused_variables))] db_pool: Option<
223 sqlx::PgPool,
224 >,
225 ) -> Result<Self> {
226 let oidc_validator = if let Some(ref auth_config) = config.auth {
228 info!(
229 issuer = %auth_config.issuer,
230 "Initializing OIDC authentication"
231 );
232 let validator = OidcValidator::new(auth_config.clone())
233 .await
234 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
235 Some(Arc::new(validator))
236 } else {
237 None
238 };
239
240 let rate_limiter = if let Some(rl) = schema_rate_limiter {
242 Some(rl)
243 } else if let Some(ref rate_config) = config.rate_limiting {
244 if rate_config.enabled {
245 info!(
246 rps_per_ip = rate_config.rps_per_ip,
247 rps_per_user = rate_config.rps_per_user,
248 "Initializing rate limiting from server config"
249 );
250 Some(Arc::new(RateLimiter::new(rate_config.clone())))
251 } else {
252 info!("Rate limiting disabled by configuration");
253 None
254 }
255 } else {
256 None
257 };
258
259 #[cfg(feature = "observers")]
261 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
262
263 #[cfg(feature = "arrow")]
265 let flight_service = {
266 let mut service = FraiseQLFlightService::new();
267 if let Some(ref validator) = oidc_validator {
268 info!("Enabling OIDC authentication for Arrow Flight");
269 service.set_oidc_validator(validator.clone());
270 } else {
271 info!("Arrow Flight initialized without authentication (dev mode)");
272 }
273 Some(service)
274 };
275
276 #[cfg(feature = "auth")]
278 if pkce_store.is_some() && oidc_server_client.is_none() {
279 tracing::error!(
280 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
281 Auth routes (/auth/start, /auth/callback) will NOT be mounted. \
282 Add [auth] with discovery_url, client_id, client_secret_env, and \
283 server_redirect_uri to fraiseql.toml and recompile the schema."
284 );
285 }
286
287 #[cfg(feature = "auth")]
289 Self::check_redis_requirement(pkce_store.as_ref())?;
290
291 #[cfg(feature = "auth")]
293 if let Some(ref store) = pkce_store {
294 use std::time::Duration;
295
296 use tokio::time::MissedTickBehavior;
297 let store_clone = Arc::clone(store);
298 tokio::spawn(async move {
299 let mut ticker = tokio::time::interval(Duration::from_secs(300));
300 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
301 loop {
302 ticker.tick().await;
303 store_clone.cleanup_expired().await;
304 }
305 });
306 }
307
308 #[cfg(not(feature = "auth"))]
311 let _ = (state_encryption, pkce_store, oidc_server_client);
312 Ok(Self {
313 config,
314 executor,
315 subscription_manager,
316 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
317 max_subscriptions_per_connection: None,
318 oidc_validator,
319 rate_limiter,
320 #[cfg(feature = "secrets")]
321 secrets_manager: None,
322 #[cfg(feature = "federation")]
323 circuit_breaker,
324 error_sanitizer,
325 #[cfg(feature = "auth")]
326 state_encryption,
327 #[cfg(feature = "auth")]
328 pkce_store,
329 #[cfg(feature = "auth")]
330 oidc_server_client,
331 api_key_authenticator,
332 revocation_manager,
333 apq_store: None,
334 trusted_docs,
335 #[cfg(feature = "observers")]
336 observer_runtime,
337 #[cfg(feature = "observers")]
338 db_pool,
339 #[cfg(feature = "arrow")]
340 flight_service,
341 #[cfg(feature = "mcp")]
342 mcp_config: None,
343 pool_tuning_config: None,
344 })
345 }
346
347 #[must_use]
349 pub fn with_subscription_lifecycle(
350 mut self,
351 lifecycle: Arc<dyn crate::subscriptions::SubscriptionLifecycle>,
352 ) -> Self {
353 self.subscription_lifecycle = lifecycle;
354 self
355 }
356
357 #[must_use]
359 pub const fn with_max_subscriptions_per_connection(mut self, max: u32) -> Self {
360 self.max_subscriptions_per_connection = Some(max);
361 self
362 }
363
364 pub fn with_pool_tuning(
373 mut self,
374 config: crate::config::pool_tuning::PoolPressureMonitorConfig,
375 ) -> std::result::Result<Self, String> {
376 config.validate()?;
377 self.pool_tuning_config = Some(config);
378 Ok(self)
379 }
380
381 #[cfg(feature = "secrets")]
385 pub fn set_secrets_manager(&mut self, manager: Arc<crate::secrets_manager::SecretsManager>) {
386 self.secrets_manager = Some(manager);
387 info!("Secrets manager attached to server");
388 }
389
390 #[cfg(feature = "mcp")]
400 pub async fn serve_mcp_stdio(self) -> Result<()> {
401 use rmcp::ServiceExt;
402
403 let mcp_cfg = self.mcp_config.ok_or_else(|| {
404 ServerError::ConfigError(
405 "FRAISEQL_MCP_STDIO=1 but MCP is not configured. \
406 Add [mcp] enabled = true to fraiseql.toml and recompile the schema."
407 .into(),
408 )
409 })?;
410
411 let schema = Arc::new(self.executor.schema().clone());
412 let executor = self.executor.clone();
413
414 let service = crate::mcp::handler::FraiseQLMcpService::new(schema, executor, mcp_cfg);
415
416 info!("MCP stdio transport starting — reading from stdin, writing to stdout");
417
418 let running = service
419 .serve((tokio::io::stdin(), tokio::io::stdout()))
420 .await
421 .map_err(|e| ServerError::ConfigError(format!("MCP stdio init failed: {e}")))?;
422
423 running
424 .waiting()
425 .await
426 .map_err(|e| ServerError::ConfigError(format!("MCP stdio error: {e}")))?;
427
428 Ok(())
429 }
430}