fraiseql_server/server/
extensions.rs1use std::sync::Arc;
5
6#[cfg(feature = "arrow")]
7use fraiseql_arrow::FraiseQLFlightService;
8#[cfg(all(feature = "arrow", feature = "auth"))]
9use fraiseql_core::security::OidcValidator;
10use fraiseql_core::{
11 db::traits::{DatabaseAdapter, RelayDatabaseAdapter},
12 runtime::{Executor, SubscriptionManager},
13 schema::CompiledSchema,
14};
15#[cfg(feature = "observers")]
16use tokio::sync::RwLock;
17use tracing::info;
18#[cfg(feature = "observers")]
19use tracing::warn;
20
21#[cfg(feature = "arrow")]
22use super::RateLimiter;
23#[cfg(all(feature = "arrow", feature = "auth"))]
24use super::ServerError;
25#[cfg(feature = "observers")]
26use super::{ObserverRuntime, ObserverRuntimeConfig};
27use super::{Result, Server, ServerConfig};
28
29impl<A: DatabaseAdapter + RelayDatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
30 pub async fn with_relay_pagination(
59 config: ServerConfig,
60 schema: CompiledSchema,
61 adapter: Arc<A>,
62 db_pool: Option<sqlx::PgPool>,
63 ) -> Result<Self> {
64 #[cfg(feature = "federation")]
66 let circuit_breaker = schema.federation.as_ref().and_then(
67 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
68 );
69 #[cfg(not(feature = "federation"))]
70 let circuit_breaker: Option<()> = None;
71 #[cfg(not(feature = "federation"))]
72 let _ = &schema.federation;
73 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
74 #[cfg(feature = "auth")]
75 let state_encryption = Self::state_encryption_from_schema(&schema)?;
76 #[cfg(not(feature = "auth"))]
77 let state_encryption: Option<
78 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
79 > = None;
80 #[cfg(feature = "auth")]
81 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
82 #[cfg(not(feature = "auth"))]
83 let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
84 #[cfg(feature = "auth")]
85 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
86 #[cfg(not(feature = "auth"))]
87 let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
88 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
89 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
90 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
91 let trusted_docs = Self::trusted_docs_from_schema(&schema);
92
93 let executor = Arc::new(Executor::new_with_relay(schema.clone(), adapter));
94 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
95
96 let mut server = Self::from_executor(
97 config,
98 executor,
99 subscription_manager,
100 circuit_breaker,
101 error_sanitizer,
102 state_encryption,
103 pkce_store,
104 oidc_server_client,
105 schema_rate_limiter,
106 api_key_authenticator,
107 revocation_manager,
108 trusted_docs,
109 db_pool,
110 )
111 .await?;
112
113 #[cfg(feature = "mcp")]
115 if let Some(ref cfg) = server.executor.schema().mcp_config {
116 if cfg.enabled {
117 let tool_count =
118 crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
119 info!(
120 path = %cfg.path,
121 transport = %cfg.transport,
122 tools = tool_count,
123 "MCP server configured"
124 );
125 server.mcp_config = Some(cfg.clone());
126 }
127 }
128
129 if server.config.apq_enabled {
131 let apq_store: fraiseql_core::apq::ArcApqStorage =
132 Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
133 server.apq_store = Some(apq_store);
134 info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
135 }
136
137 Ok(server)
138 }
139}
140
141impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
142 #[cfg(feature = "arrow")]
158 pub async fn with_flight_service(
159 config: ServerConfig,
160 schema: CompiledSchema,
161 adapter: Arc<A>,
162 #[allow(unused_variables)]
163 db_pool: Option<sqlx::PgPool>,
165 flight_service: Option<FraiseQLFlightService>,
166 ) -> Result<Self> {
167 #[cfg(feature = "federation")]
169 let circuit_breaker = schema.federation.as_ref().and_then(
170 crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
171 );
172 #[cfg(not(feature = "federation"))]
173 let _circuit_breaker: Option<()> = None;
174 #[cfg(not(feature = "federation"))]
175 let _ = &schema.federation;
176 let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
177 #[cfg(feature = "auth")]
178 let state_encryption = Self::state_encryption_from_schema(&schema)?;
179 #[cfg(not(feature = "auth"))]
180 let _state_encryption: Option<
181 std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
182 > = None;
183 #[cfg(feature = "auth")]
184 let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
185 #[cfg(not(feature = "auth"))]
186 let _pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
187 #[cfg(feature = "auth")]
188 let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
189 #[cfg(not(feature = "auth"))]
190 let _oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
191 let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
192 let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
193 let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
194 let trusted_docs = Self::trusted_docs_from_schema(&schema);
195
196 let executor = Arc::new(Executor::new(schema.clone(), adapter));
197 let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
198
199 #[cfg(feature = "auth")]
201 let oidc_validator = if let Some(ref auth_config) = config.auth {
202 info!(
203 issuer = %auth_config.issuer,
204 "Initializing OIDC authentication"
205 );
206 let validator = OidcValidator::new(auth_config.clone())
207 .await
208 .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
209 Some(Arc::new(validator))
210 } else {
211 None
212 };
213 #[cfg(not(feature = "auth"))]
214 let oidc_validator: Option<Arc<fraiseql_core::security::OidcValidator>> = None;
215
216 let rate_limiter = if let Some(rl) = schema_rate_limiter {
218 Some(rl)
219 } else if let Some(ref rate_config) = config.rate_limiting {
220 if rate_config.enabled {
221 info!(
222 rps_per_ip = rate_config.rps_per_ip,
223 rps_per_user = rate_config.rps_per_user,
224 "Initializing rate limiting from server config"
225 );
226 Some(Arc::new(RateLimiter::new(rate_config.clone())))
227 } else {
228 info!("Rate limiting disabled by configuration");
229 None
230 }
231 } else {
232 None
233 };
234
235 #[cfg(feature = "observers")]
237 let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
238
239 #[cfg(feature = "auth")]
241 if pkce_store.is_some() && oidc_server_client.is_none() {
242 tracing::error!(
243 "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
244 Auth routes will NOT be mounted."
245 );
246 }
247
248 #[cfg(feature = "auth")]
250 Self::check_redis_requirement(pkce_store.as_ref())?;
251
252 #[cfg(feature = "auth")]
254 if let Some(ref store) = pkce_store {
255 use std::time::Duration;
256
257 use tokio::time::MissedTickBehavior;
258 let store_clone = Arc::clone(store);
259 tokio::spawn(async move {
260 let mut ticker = tokio::time::interval(Duration::from_secs(300));
261 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
262 loop {
263 ticker.tick().await;
264 store_clone.cleanup_expired().await;
265 }
266 });
267 }
268
269 let apq_enabled = config.apq_enabled;
270
271 Ok(Self {
272 config,
273 executor,
274 subscription_manager,
275 subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
276 max_subscriptions_per_connection: None,
277 oidc_validator,
278 rate_limiter,
279 #[cfg(feature = "secrets")]
280 secrets_manager: None,
281 #[cfg(feature = "federation")]
282 circuit_breaker,
283 error_sanitizer,
284 #[cfg(feature = "auth")]
285 state_encryption,
286 #[cfg(feature = "auth")]
287 pkce_store,
288 #[cfg(feature = "auth")]
289 oidc_server_client,
290 api_key_authenticator,
291 revocation_manager,
292 apq_store: if apq_enabled {
293 Some(Arc::new(fraiseql_core::apq::InMemoryApqStorage::default())
294 as fraiseql_core::apq::ArcApqStorage)
295 } else {
296 None
297 },
298 trusted_docs,
299 #[cfg(feature = "mcp")]
300 mcp_config: None,
301 pool_tuning_config: None,
302 #[cfg(feature = "observers")]
303 observer_runtime,
304 #[cfg(feature = "observers")]
305 db_pool,
306 flight_service,
307 })
308 }
309
310 #[cfg(feature = "observers")]
312 pub(super) async fn init_observer_runtime(
313 config: &ServerConfig,
314 pool: Option<&sqlx::PgPool>,
315 ) -> Option<Arc<RwLock<ObserverRuntime>>> {
316 let observer_config = match &config.observers {
318 Some(cfg) if cfg.enabled => cfg,
319 _ => {
320 info!("Observer runtime disabled");
321 return None;
322 },
323 };
324
325 let Some(pool) = pool else {
326 warn!("No database pool provided for observers");
327 return None;
328 };
329
330 info!("Initializing observer runtime");
331
332 let runtime_config = ObserverRuntimeConfig::new(pool.clone())
333 .with_poll_interval(observer_config.poll_interval_ms)
334 .with_batch_size(observer_config.batch_size)
335 .with_channel_capacity(observer_config.channel_capacity);
336
337 let runtime = ObserverRuntime::new(runtime_config);
338 Some(Arc::new(RwLock::new(runtime)))
339 }
340}