fraiseql_server/server/
extensions.rs1use std::sync::Arc;
5
6#[cfg(feature = "arrow")]
7use fraiseql_arrow::FraiseQLFlightService;
8#[cfg(all(feature = "arrow", feature = "auth"))]
9use fraiseql_core::security::OidcValidator;
10use fraiseql_core::{
11 cache::{CacheConfig, CachedDatabaseAdapter, QueryResultCache},
12 db::traits::{DatabaseAdapter, RelayDatabaseAdapter},
13 runtime::{Executor, SubscriptionManager},
14 schema::CompiledSchema,
15};
16#[cfg(feature = "observers")]
17use tokio::sync::RwLock;
18use tracing::info;
19#[cfg(feature = "observers")]
20use tracing::warn;
21
22#[cfg(feature = "arrow")]
23use super::RateLimiter;
24#[cfg(all(feature = "arrow", feature = "auth"))]
25use super::ServerError;
26#[cfg(feature = "observers")]
27use super::{ObserverRuntime, ObserverRuntimeConfig};
28use super::{Result, Server, ServerConfig};
29
30impl<A: DatabaseAdapter + RelayDatabaseAdapter + Clone + Send + Sync + 'static>
31 Server<CachedDatabaseAdapter<A>>
32{
33 pub async fn with_relay_pagination(
68 config: ServerConfig,
69 schema: CompiledSchema,
70 adapter: Arc<A>,
71 db_pool: Option<sqlx::PgPool>,
72 ) -> Result<Self> {
73 if config.cache_enabled && !schema.has_rls_configured() {
75 if schema.is_multi_tenant() {
76 return Err(super::ServerError::ConfigError(
77 "Cache is enabled in a multi-tenant schema but no Row-Level Security \
78 policies are declared. This would allow cross-tenant cache hits and \
79 data leakage. In fraiseql.toml, either disable caching with \
80 [cache] enabled = false, declare [security.rls] policies, or set \
81 [security] multi_tenant = false to acknowledge single-tenant mode."
82 .to_string(),
83 ));
84 }
85 tracing::warn!(
86 "Query-result caching is enabled but no Row-Level Security policies are \
87 declared in the compiled schema. This is safe for single-tenant deployments."
88 );
89 }
90
91 #[cfg(feature = "federation")]
93 let circuit_breaker = schema.federation.as_ref().and_then(
94 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
95 );
96 #[cfg(not(feature = "federation"))]
97 let circuit_breaker: Option<()> = None;
98 #[cfg(not(feature = "federation"))]
99 let _ = &schema.federation;
100 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
101 #[cfg(feature = "auth")]
102 let state_encryption = Self::state_encryption_from_schema(&schema)?;
103 #[cfg(not(feature = "auth"))]
104 let state_encryption: Option<
105 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
106 > = None;
107 #[cfg(feature = "auth")]
108 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
109 #[cfg(not(feature = "auth"))]
110 let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
111 #[cfg(feature = "auth")]
112 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
113 #[cfg(not(feature = "auth"))]
114 let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
115 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
116 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
117 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
118 let trusted_docs = Self::trusted_docs_from_schema(&schema);
119
120 let cache_config = CacheConfig::from(config.cache_enabled);
121 let cache = QueryResultCache::new(cache_config);
122 let inner = Arc::into_inner(adapter)
124 .expect("CachedDatabaseAdapter wrapping requires exclusive Arc ownership at startup");
125 let cached = CachedDatabaseAdapter::new(inner, cache, schema.content_hash())
126 .with_ttl_overrides_from_schema(&schema);
127 let executor = Arc::new(Executor::new_with_relay(schema.clone(), Arc::new(cached)));
128 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
129
130 let mut server = Self::from_executor(
131 config,
132 executor,
133 subscription_manager,
134 circuit_breaker,
135 error_sanitizer,
136 state_encryption,
137 pkce_store,
138 oidc_server_client,
139 schema_rate_limiter,
140 api_key_authenticator,
141 revocation_manager,
142 trusted_docs,
143 db_pool,
144 )
145 .await?;
146
147 server.adapter_cache_enabled = cache_config.enabled;
148
149 #[cfg(feature = "mcp")]
151 if let Some(ref cfg) = server.executor.schema().mcp_config {
152 if cfg.enabled {
153 let tool_count =
154 crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
155 info!(
156 path = %cfg.path,
157 transport = %cfg.transport,
158 tools = tool_count,
159 "MCP server configured"
160 );
161 server.mcp_config = Some(cfg.clone());
162 }
163 }
164
165 if server.config.apq_enabled {
167 let apq_store: fraiseql_core::apq::ArcApqStorage =
168 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
169 server.apq_store = Some(apq_store);
170 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
171 }
172
173 Ok(server)
174 }
175}
176
177impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
178 #[cfg(feature = "arrow")]
194 pub async fn with_flight_service(
195 config: ServerConfig,
196 schema: CompiledSchema,
197 adapter: Arc<A>,
198 #[allow(unused_variables)]
199 db_pool: Option<sqlx::PgPool>,
201 flight_service: Option<FraiseQLFlightService>,
202 ) -> Result<Self> {
203 #[cfg(feature = "federation")]
205 let circuit_breaker = schema.federation.as_ref().and_then(
206 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
207 );
208 #[cfg(not(feature = "federation"))]
209 let _circuit_breaker: Option<()> = None;
210 #[cfg(not(feature = "federation"))]
211 let _ = &schema.federation;
212 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
213 #[cfg(feature = "auth")]
214 let state_encryption = Self::state_encryption_from_schema(&schema)?;
215 #[cfg(not(feature = "auth"))]
216 let _state_encryption: Option<
217 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
218 > = None;
219 #[cfg(feature = "auth")]
220 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
221 #[cfg(not(feature = "auth"))]
222 let _pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
223 #[cfg(feature = "auth")]
224 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
225 #[cfg(not(feature = "auth"))]
226 let _oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
227 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
228 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
229 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
230 let trusted_docs = Self::trusted_docs_from_schema(&schema);
231
232 let executor = Arc::new(Executor::new(schema.clone(), adapter));
233 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
234
235 #[cfg(feature = "auth")]
237 let oidc_validator = if let Some(ref auth_config) = config.auth {
238 info!(
239 issuer = %auth_config.issuer,
240 "Initializing OIDC authentication"
241 );
242 let validator = OidcValidator::new(auth_config.clone())
243 .await
244 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
245 Some(Arc::new(validator))
246 } else {
247 None
248 };
249 #[cfg(not(feature = "auth"))]
250 let oidc_validator: Option<Arc<fraiseql_core::security::OidcValidator>> = None;
251
252 let hs256_auth = super::builder::build_hs256_auth(&config)?;
254
255 let rate_limiter = if let Some(rl) = schema_rate_limiter {
257 Some(rl)
258 } else if let Some(ref rate_config) = config.rate_limiting {
259 if rate_config.enabled {
260 info!(
261 rps_per_ip = rate_config.rps_per_ip,
262 rps_per_user = rate_config.rps_per_user,
263 "Initializing rate limiting from server config"
264 );
265 Some(Arc::new(RateLimiter::new(rate_config.clone())))
266 } else {
267 info!("Rate limiting disabled by configuration");
268 None
269 }
270 } else {
271 None
272 };
273
274 #[cfg(feature = "observers")]
276 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
277
278 #[cfg(feature = "auth")]
280 if pkce_store.is_some() && oidc_server_client.is_none() {
281 tracing::error!(
282 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
283 Auth routes will NOT be mounted."
284 );
285 }
286
287 #[cfg(feature = "auth")]
289 Self::check_redis_requirement(pkce_store.as_ref())?;
290
291 #[cfg(feature = "auth")]
293 if let Some(ref store) = pkce_store {
294 use std::time::Duration;
295
296 use tokio::time::MissedTickBehavior;
297 let store_clone = Arc::clone(store);
298 tokio::spawn(async move {
299 let mut ticker = tokio::time::interval(Duration::from_secs(300));
300 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
301 loop {
302 ticker.tick().await;
303 store_clone.cleanup_expired().await;
304 }
305 });
306 }
307
308 let apq_enabled = config.apq_enabled;
309
310 Ok(Self {
311 config,
312 executor,
313 subscription_manager,
314 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
315 max_subscriptions_per_connection: None,
316 oidc_validator,
317 hs256_auth,
318 rate_limiter,
319 #[cfg(feature = "secrets")]
320 secrets_manager: None,
321 #[cfg(feature = "federation")]
322 circuit_breaker,
323 error_sanitizer,
324 #[cfg(feature = "auth")]
325 state_encryption,
326 #[cfg(feature = "auth")]
327 pkce_store,
328 #[cfg(feature = "auth")]
329 oidc_server_client,
330 api_key_authenticator,
331 revocation_manager,
332 apq_store: if apq_enabled {
333 Some(Arc::new(fraiseql_core::apq::InMemoryApqStorage::default())
334 as fraiseql_core::apq::ArcApqStorage)
335 } else {
336 None
337 },
338 trusted_docs,
339 #[cfg(feature = "mcp")]
340 mcp_config: None,
341 pool_tuning_config: None,
342 #[cfg(feature = "observers")]
343 observer_runtime,
344 #[cfg(feature = "observers")]
345 db_pool,
346 flight_service,
347 adapter_cache_enabled: false,
348 })
349 }
350
351 #[cfg(feature = "observers")]
353 pub(super) async fn init_observer_runtime(
354 config: &ServerConfig,
355 pool: Option<&sqlx::PgPool>,
356 ) -> Option<Arc<RwLock<ObserverRuntime>>> {
357 let observer_config = match &config.observers {
359 Some(cfg) if cfg.enabled => cfg,
360 _ => {
361 info!("Observer runtime disabled");
362 return None;
363 },
364 };
365
366 let Some(pool) = pool else {
367 warn!("No database pool provided for observers");
368 return None;
369 };
370
371 info!("Initializing observer runtime");
372
373 let runtime_config = ObserverRuntimeConfig::new(pool.clone())
374 .with_poll_interval(observer_config.poll_interval_ms)
375 .with_batch_size(observer_config.batch_size)
376 .with_channel_capacity(observer_config.channel_capacity);
377
378 let runtime = ObserverRuntime::new(runtime_config);
379 Some(Arc::new(RwLock::new(runtime)))
380 }
381}