Skip to main content

fraiseql_server/server/
extensions.rs

1//! Server extensions: relay pagination, Arrow Flight service, and observer runtime
2//! initialization.
3
4use std::sync::Arc;
5
6#[cfg(feature = "arrow")]
7use fraiseql_arrow::FraiseQLFlightService;
8#[cfg(all(feature = "arrow", feature = "auth"))]
9use fraiseql_core::security::OidcValidator;
10use fraiseql_core::{
11    cache::{CacheConfig, CachedDatabaseAdapter, QueryResultCache},
12    db::traits::{DatabaseAdapter, RelayDatabaseAdapter},
13    runtime::{Executor, SubscriptionManager},
14    schema::CompiledSchema,
15};
16#[cfg(feature = "observers")]
17use tokio::sync::RwLock;
18use tracing::info;
19#[cfg(feature = "observers")]
20use tracing::warn;
21
22#[cfg(feature = "arrow")]
23use super::RateLimiter;
24#[cfg(all(feature = "arrow", feature = "auth"))]
25use super::ServerError;
26#[cfg(feature = "observers")]
27use super::{ObserverRuntime, ObserverRuntimeConfig};
28use super::{Result, Server, ServerConfig};
29
30impl<A: DatabaseAdapter + RelayDatabaseAdapter + Clone + Send + Sync + 'static>
31    Server<CachedDatabaseAdapter<A>>
32{
33    /// Create a server with relay pagination support enabled.
34    ///
35    /// The adapter must implement [`RelayDatabaseAdapter`]. Currently, only
36    /// `PostgresAdapter` and `CachedDatabaseAdapter<PostgresAdapter>` satisfy this bound.
37    ///
38    /// Relay queries issued against a server created with [`Server::new`] return a
39    /// `Validation` error at runtime; those issued against a server created with this
40    /// constructor succeed.
41    ///
42    /// # Arguments
43    ///
44    /// * `config` - Server configuration
45    /// * `schema` - Compiled GraphQL schema
46    /// * `adapter` - Database adapter (must implement `RelayDatabaseAdapter`)
47    /// * `db_pool` - Database connection pool (optional, required for observers)
48    ///
49    /// # Errors
50    ///
51    /// Returns error if OIDC validator initialization fails.
52    ///
53    /// # Panics
54    ///
55    /// Panics if the `adapter` `Arc` has been cloned before calling this constructor
56    /// (refcount > 1). The builder must have exclusive ownership to unwrap the adapter
57    /// for `CachedDatabaseAdapter` construction.
58    ///
59    /// # Example
60    ///
61    /// ```text
62    /// // Requires: running PostgreSQL database and compiled schema file.
63    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
64    /// let server = Server::with_relay_pagination(config, schema, adapter, None).await?;
65    /// server.serve().await?;
66    /// ```
67    pub async fn with_relay_pagination(
68        config: ServerConfig,
69        schema: CompiledSchema,
70        adapter: Arc<A>,
71        db_pool: Option<sqlx::PgPool>,
72    ) -> Result<Self> {
73        // Validate cache + RLS safety (mirrors Server::new).
74        if config.cache_enabled && !schema.has_rls_configured() {
75            if schema.is_multi_tenant() {
76                return Err(super::ServerError::ConfigError(
77                    "Cache is enabled in a multi-tenant schema but no Row-Level Security \
78                     policies are declared. This would allow cross-tenant cache hits and \
79                     data leakage. In fraiseql.toml, either disable caching with \
80                     [cache] enabled = false, declare [security.rls] policies, or set \
81                     [security] multi_tenant = false to acknowledge single-tenant mode."
82                        .to_string(),
83                ));
84            }
85            tracing::warn!(
86                "Query-result caching is enabled but no Row-Level Security policies are \
87                 declared in the compiled schema. This is safe for single-tenant deployments."
88            );
89        }
90
91        // Read security configs from compiled schema BEFORE schema is moved.
92        #[cfg(feature = "federation")]
93        let circuit_breaker = schema.federation.as_ref().and_then(
94            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
95        );
96        #[cfg(not(feature = "federation"))]
97        let circuit_breaker: Option<()> = None;
98        #[cfg(not(feature = "federation"))]
99        let _ = &schema.federation;
100        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
101        #[cfg(feature = "auth")]
102        let state_encryption = Self::state_encryption_from_schema(&schema)?;
103        #[cfg(not(feature = "auth"))]
104        let state_encryption: Option<
105            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
106        > = None;
107        #[cfg(feature = "auth")]
108        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
109        #[cfg(not(feature = "auth"))]
110        let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
111        #[cfg(feature = "auth")]
112        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
113        #[cfg(not(feature = "auth"))]
114        let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
115        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
116        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
117        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
118        let trusted_docs = Self::trusted_docs_from_schema(&schema);
119
120        let cache_config = CacheConfig::from(config.cache_enabled);
121        let cache = QueryResultCache::new(cache_config);
122        // Unwrap Arc: refcount is 1 here — adapter has not been cloned since being passed in.
123        let inner = Arc::into_inner(adapter)
124            .expect("CachedDatabaseAdapter wrapping requires exclusive Arc ownership at startup");
125        let cached = CachedDatabaseAdapter::new(inner, cache, schema.content_hash())
126            .with_ttl_overrides_from_schema(&schema);
127        let executor = Arc::new(Executor::new_with_relay(schema.clone(), Arc::new(cached)));
128        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
129
130        let mut server = Self::from_executor(
131            config,
132            executor,
133            subscription_manager,
134            circuit_breaker,
135            error_sanitizer,
136            state_encryption,
137            pkce_store,
138            oidc_server_client,
139            schema_rate_limiter,
140            api_key_authenticator,
141            revocation_manager,
142            trusted_docs,
143            db_pool,
144        )
145        .await?;
146
147        server.adapter_cache_enabled = cache_config.enabled;
148
149        // Initialize MCP config from compiled schema when the feature is compiled in.
150        #[cfg(feature = "mcp")]
151        if let Some(ref cfg) = server.executor.schema().mcp_config {
152            if cfg.enabled {
153                let tool_count =
154                    crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
155                info!(
156                    path = %cfg.path,
157                    transport = %cfg.transport,
158                    tools = tool_count,
159                    "MCP server configured"
160                );
161                server.mcp_config = Some(cfg.clone());
162            }
163        }
164
165        // Initialize APQ store when enabled.
166        if server.config.apq_enabled {
167            let apq_store: fraiseql_core::apq::ArcApqStorage =
168                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
169            server.apq_store = Some(apq_store);
170            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
171        }
172
173        Ok(server)
174    }
175}
176
177impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
178    /// Create new server with pre-configured Arrow Flight service.
179    ///
180    /// Use this constructor when you want to provide a Flight service with a real database adapter.
181    ///
182    /// # Arguments
183    ///
184    /// * `config` - Server configuration
185    /// * `schema` - Compiled GraphQL schema
186    /// * `adapter` - Database adapter
187    /// * `db_pool` - Database connection pool (optional, required for observers)
188    /// * `flight_service` - Pre-configured Flight service (only available with arrow feature)
189    ///
190    /// # Errors
191    ///
192    /// Returns error if OIDC validator initialization fails.
193    #[cfg(feature = "arrow")]
194    pub async fn with_flight_service(
195        config: ServerConfig,
196        schema: CompiledSchema,
197        adapter: Arc<A>,
198        #[allow(unused_variables)]
199        // Reason: used inside #[cfg(feature = "observers")] block; unused when feature is off
200        db_pool: Option<sqlx::PgPool>,
201        flight_service: Option<FraiseQLFlightService>,
202    ) -> Result<Self> {
203        // Read security configs from compiled schema BEFORE schema is moved.
204        #[cfg(feature = "federation")]
205        let circuit_breaker = schema.federation.as_ref().and_then(
206            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
207        );
208        #[cfg(not(feature = "federation"))]
209        let _circuit_breaker: Option<()> = None;
210        #[cfg(not(feature = "federation"))]
211        let _ = &schema.federation;
212        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
213        #[cfg(feature = "auth")]
214        let state_encryption = Self::state_encryption_from_schema(&schema)?;
215        #[cfg(not(feature = "auth"))]
216        let _state_encryption: Option<
217            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
218        > = None;
219        #[cfg(feature = "auth")]
220        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
221        #[cfg(not(feature = "auth"))]
222        let _pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
223        #[cfg(feature = "auth")]
224        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
225        #[cfg(not(feature = "auth"))]
226        let _oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
227        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
228        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
229        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
230        let trusted_docs = Self::trusted_docs_from_schema(&schema);
231
232        let executor = Arc::new(Executor::new(schema.clone(), adapter));
233        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
234
235        // Initialize OIDC validator if auth is configured
236        #[cfg(feature = "auth")]
237        let oidc_validator = if let Some(ref auth_config) = config.auth {
238            info!(
239                issuer = %auth_config.issuer,
240                "Initializing OIDC authentication"
241            );
242            let validator = OidcValidator::new(auth_config.clone())
243                .await
244                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
245            Some(Arc::new(validator))
246        } else {
247            None
248        };
249        #[cfg(not(feature = "auth"))]
250        let oidc_validator: Option<Arc<fraiseql_core::security::OidcValidator>> = None;
251
252        // Initialize rate limiter: compiled schema config takes priority over server config.
253        let rate_limiter = if let Some(rl) = schema_rate_limiter {
254            Some(rl)
255        } else if let Some(ref rate_config) = config.rate_limiting {
256            if rate_config.enabled {
257                info!(
258                    rps_per_ip = rate_config.rps_per_ip,
259                    rps_per_user = rate_config.rps_per_user,
260                    "Initializing rate limiting from server config"
261                );
262                Some(Arc::new(RateLimiter::new(rate_config.clone())))
263            } else {
264                info!("Rate limiting disabled by configuration");
265                None
266            }
267        } else {
268            None
269        };
270
271        // Initialize observer runtime
272        #[cfg(feature = "observers")]
273        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
274
275        // Warn if PKCE is configured but [auth] is missing.
276        #[cfg(feature = "auth")]
277        if pkce_store.is_some() && oidc_server_client.is_none() {
278            tracing::error!(
279                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
280                 Auth routes will NOT be mounted."
281            );
282        }
283
284        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
285        #[cfg(feature = "auth")]
286        Self::check_redis_requirement(pkce_store.as_ref())?;
287
288        // Spawn background PKCE state cleanup task (every 5 minutes).
289        #[cfg(feature = "auth")]
290        if let Some(ref store) = pkce_store {
291            use std::time::Duration;
292
293            use tokio::time::MissedTickBehavior;
294            let store_clone = Arc::clone(store);
295            tokio::spawn(async move {
296                let mut ticker = tokio::time::interval(Duration::from_secs(300));
297                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
298                loop {
299                    ticker.tick().await;
300                    store_clone.cleanup_expired().await;
301                }
302            });
303        }
304
305        let apq_enabled = config.apq_enabled;
306
307        Ok(Self {
308            config,
309            executor,
310            subscription_manager,
311            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
312            max_subscriptions_per_connection: None,
313            oidc_validator,
314            rate_limiter,
315            #[cfg(feature = "secrets")]
316            secrets_manager: None,
317            #[cfg(feature = "federation")]
318            circuit_breaker,
319            error_sanitizer,
320            #[cfg(feature = "auth")]
321            state_encryption,
322            #[cfg(feature = "auth")]
323            pkce_store,
324            #[cfg(feature = "auth")]
325            oidc_server_client,
326            api_key_authenticator,
327            revocation_manager,
328            apq_store: if apq_enabled {
329                Some(Arc::new(fraiseql_core::apq::InMemoryApqStorage::default())
330                    as fraiseql_core::apq::ArcApqStorage)
331            } else {
332                None
333            },
334            trusted_docs,
335            #[cfg(feature = "mcp")]
336            mcp_config: None,
337            pool_tuning_config: None,
338            #[cfg(feature = "observers")]
339            observer_runtime,
340            #[cfg(feature = "observers")]
341            db_pool,
342            flight_service,
343            adapter_cache_enabled: false,
344        })
345    }
346
347    /// Initialize observer runtime from configuration
348    #[cfg(feature = "observers")]
349    pub(super) async fn init_observer_runtime(
350        config: &ServerConfig,
351        pool: Option<&sqlx::PgPool>,
352    ) -> Option<Arc<RwLock<ObserverRuntime>>> {
353        // Check if enabled
354        let observer_config = match &config.observers {
355            Some(cfg) if cfg.enabled => cfg,
356            _ => {
357                info!("Observer runtime disabled");
358                return None;
359            },
360        };
361
362        let Some(pool) = pool else {
363            warn!("No database pool provided for observers");
364            return None;
365        };
366
367        info!("Initializing observer runtime");
368
369        let runtime_config = ObserverRuntimeConfig::new(pool.clone())
370            .with_poll_interval(observer_config.poll_interval_ms)
371            .with_batch_size(observer_config.batch_size)
372            .with_channel_capacity(observer_config.channel_capacity);
373
374        let runtime = ObserverRuntime::new(runtime_config);
375        Some(Arc::new(RwLock::new(runtime)))
376    }
377}