Skip to main content

fraiseql_server/server/
extensions.rs

1//! Server extensions: relay pagination, Arrow Flight service, and observer runtime
2//! initialization.
3
4use std::sync::Arc;
5
6#[cfg(feature = "arrow")]
7use fraiseql_arrow::FraiseQLFlightService;
8#[cfg(all(feature = "arrow", feature = "auth"))]
9use fraiseql_core::security::OidcValidator;
10use fraiseql_core::{
11    db::traits::{DatabaseAdapter, RelayDatabaseAdapter},
12    runtime::{Executor, SubscriptionManager},
13    schema::CompiledSchema,
14};
15#[cfg(feature = "observers")]
16use tokio::sync::RwLock;
17use tracing::info;
18#[cfg(feature = "observers")]
19use tracing::warn;
20
21#[cfg(feature = "arrow")]
22use super::RateLimiter;
23#[cfg(all(feature = "arrow", feature = "auth"))]
24use super::ServerError;
25#[cfg(feature = "observers")]
26use super::{ObserverRuntime, ObserverRuntimeConfig};
27use super::{Result, Server, ServerConfig};
28
29impl<A: DatabaseAdapter + RelayDatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
30    /// Create a server with relay pagination support enabled.
31    ///
32    /// The adapter must implement [`RelayDatabaseAdapter`]. Currently, only
33    /// `PostgresAdapter` and `CachedDatabaseAdapter<PostgresAdapter>` satisfy this bound.
34    ///
35    /// Relay queries issued against a server created with [`Server::new`] return a
36    /// `Validation` error at runtime; those issued against a server created with this
37    /// constructor succeed.
38    ///
39    /// # Arguments
40    ///
41    /// * `config` - Server configuration
42    /// * `schema` - Compiled GraphQL schema
43    /// * `adapter` - Database adapter (must implement `RelayDatabaseAdapter`)
44    /// * `db_pool` - Database connection pool (optional, required for observers)
45    ///
46    /// # Errors
47    ///
48    /// Returns error if OIDC validator initialization fails.
49    ///
50    /// # Example
51    ///
52    /// ```text
53    /// // Requires: running PostgreSQL database and compiled schema file.
54    /// let adapter = Arc::new(PostgresAdapter::new(db_url).await?);
55    /// let server = Server::with_relay_pagination(config, schema, adapter, None).await?;
56    /// server.serve().await?;
57    /// ```
58    pub async fn with_relay_pagination(
59        config: ServerConfig,
60        schema: CompiledSchema,
61        adapter: Arc<A>,
62        db_pool: Option<sqlx::PgPool>,
63    ) -> Result<Self> {
64        // Read security configs from compiled schema BEFORE schema is moved.
65        #[cfg(feature = "federation")]
66        let circuit_breaker = schema.federation.as_ref().and_then(
67            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
68        );
69        #[cfg(not(feature = "federation"))]
70        let circuit_breaker: Option<()> = None;
71        #[cfg(not(feature = "federation"))]
72        let _ = &schema.federation;
73        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
74        #[cfg(feature = "auth")]
75        let state_encryption = Self::state_encryption_from_schema(&schema)?;
76        #[cfg(not(feature = "auth"))]
77        let state_encryption: Option<
78            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
79        > = None;
80        #[cfg(feature = "auth")]
81        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
82        #[cfg(not(feature = "auth"))]
83        let pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
84        #[cfg(feature = "auth")]
85        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
86        #[cfg(not(feature = "auth"))]
87        let oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
88        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
89        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
90        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
91        let trusted_docs = Self::trusted_docs_from_schema(&schema);
92
93        let executor = Arc::new(Executor::new_with_relay(schema.clone(), adapter));
94        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
95
96        let mut server = Self::from_executor(
97            config,
98            executor,
99            subscription_manager,
100            circuit_breaker,
101            error_sanitizer,
102            state_encryption,
103            pkce_store,
104            oidc_server_client,
105            schema_rate_limiter,
106            api_key_authenticator,
107            revocation_manager,
108            trusted_docs,
109            db_pool,
110        )
111        .await?;
112
113        // Initialize MCP config from compiled schema when the feature is compiled in.
114        #[cfg(feature = "mcp")]
115        if let Some(ref cfg) = server.executor.schema().mcp_config {
116            if cfg.enabled {
117                let tool_count =
118                    crate::mcp::tools::schema_to_tools(server.executor.schema(), cfg).len();
119                info!(
120                    path = %cfg.path,
121                    transport = %cfg.transport,
122                    tools = tool_count,
123                    "MCP server configured"
124                );
125                server.mcp_config = Some(cfg.clone());
126            }
127        }
128
129        // Initialize APQ store when enabled.
130        if server.config.apq_enabled {
131            let apq_store: fraiseql_core::apq::ArcApqStorage =
132                Arc::new(fraiseql_core::apq::InMemoryApqStorage::default());
133            server.apq_store = Some(apq_store);
134            info!("APQ (Automatic Persisted Queries) enabled — in-memory backend");
135        }
136
137        Ok(server)
138    }
139}
140
141impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
142    /// Create new server with pre-configured Arrow Flight service.
143    ///
144    /// Use this constructor when you want to provide a Flight service with a real database adapter.
145    ///
146    /// # Arguments
147    ///
148    /// * `config` - Server configuration
149    /// * `schema` - Compiled GraphQL schema
150    /// * `adapter` - Database adapter
151    /// * `db_pool` - Database connection pool (optional, required for observers)
152    /// * `flight_service` - Pre-configured Flight service (only available with arrow feature)
153    ///
154    /// # Errors
155    ///
156    /// Returns error if OIDC validator initialization fails.
157    #[cfg(feature = "arrow")]
158    pub async fn with_flight_service(
159        config: ServerConfig,
160        schema: CompiledSchema,
161        adapter: Arc<A>,
162        #[allow(unused_variables)]
163        // Reason: used inside #[cfg(feature = "observers")] block; unused when feature is off
164        db_pool: Option<sqlx::PgPool>,
165        flight_service: Option<FraiseQLFlightService>,
166    ) -> Result<Self> {
167        // Read security configs from compiled schema BEFORE schema is moved.
168        #[cfg(feature = "federation")]
169        let circuit_breaker = schema.federation.as_ref().and_then(
170            crate::federation::circuit_breaker::FederationCircuitBreakerManager::from_config,
171        );
172        #[cfg(not(feature = "federation"))]
173        let _circuit_breaker: Option<()> = None;
174        #[cfg(not(feature = "federation"))]
175        let _ = &schema.federation;
176        let error_sanitizer = Self::error_sanitizer_from_schema(&schema);
177        #[cfg(feature = "auth")]
178        let state_encryption = Self::state_encryption_from_schema(&schema)?;
179        #[cfg(not(feature = "auth"))]
180        let _state_encryption: Option<
181            std::sync::Arc<crate::auth::state_encryption::StateEncryptionService>,
182        > = None;
183        #[cfg(feature = "auth")]
184        let pkce_store = Self::pkce_store_from_schema(&schema, state_encryption.as_ref()).await;
185        #[cfg(not(feature = "auth"))]
186        let _pkce_store: Option<std::sync::Arc<crate::auth::PkceStateStore>> = None;
187        #[cfg(feature = "auth")]
188        let oidc_server_client = Self::oidc_server_client_from_schema(&schema);
189        #[cfg(not(feature = "auth"))]
190        let _oidc_server_client: Option<std::sync::Arc<crate::auth::OidcServerClient>> = None;
191        let schema_rate_limiter = Self::rate_limiter_from_schema(&schema).await;
192        let api_key_authenticator = crate::api_key::api_key_authenticator_from_schema(&schema);
193        let revocation_manager = crate::token_revocation::revocation_manager_from_schema(&schema);
194        let trusted_docs = Self::trusted_docs_from_schema(&schema);
195
196        let executor = Arc::new(Executor::new(schema.clone(), adapter));
197        let subscription_manager = Arc::new(SubscriptionManager::new(Arc::new(schema)));
198
199        // Initialize OIDC validator if auth is configured
200        #[cfg(feature = "auth")]
201        let oidc_validator = if let Some(ref auth_config) = config.auth {
202            info!(
203                issuer = %auth_config.issuer,
204                "Initializing OIDC authentication"
205            );
206            let validator = OidcValidator::new(auth_config.clone())
207                .await
208                .map_err(|e| ServerError::ConfigError(format!("Failed to initialize OIDC: {e}")))?;
209            Some(Arc::new(validator))
210        } else {
211            None
212        };
213        #[cfg(not(feature = "auth"))]
214        let oidc_validator: Option<Arc<fraiseql_core::security::OidcValidator>> = None;
215
216        // Initialize rate limiter: compiled schema config takes priority over server config.
217        let rate_limiter = if let Some(rl) = schema_rate_limiter {
218            Some(rl)
219        } else if let Some(ref rate_config) = config.rate_limiting {
220            if rate_config.enabled {
221                info!(
222                    rps_per_ip = rate_config.rps_per_ip,
223                    rps_per_user = rate_config.rps_per_user,
224                    "Initializing rate limiting from server config"
225                );
226                Some(Arc::new(RateLimiter::new(rate_config.clone())))
227            } else {
228                info!("Rate limiting disabled by configuration");
229                None
230            }
231        } else {
232            None
233        };
234
235        // Initialize observer runtime
236        #[cfg(feature = "observers")]
237        let observer_runtime = Self::init_observer_runtime(&config, db_pool.as_ref()).await;
238
239        // Warn if PKCE is configured but [auth] is missing.
240        #[cfg(feature = "auth")]
241        if pkce_store.is_some() && oidc_server_client.is_none() {
242            tracing::error!(
243                "pkce.enabled = true but [auth] is not configured or OIDC client init failed. \
244                 Auth routes will NOT be mounted."
245            );
246        }
247
248        // Refuse to start if FRAISEQL_REQUIRE_REDIS is set and PKCE store is in-memory.
249        #[cfg(feature = "auth")]
250        Self::check_redis_requirement(pkce_store.as_ref())?;
251
252        // Spawn background PKCE state cleanup task (every 5 minutes).
253        #[cfg(feature = "auth")]
254        if let Some(ref store) = pkce_store {
255            use std::time::Duration;
256
257            use tokio::time::MissedTickBehavior;
258            let store_clone = Arc::clone(store);
259            tokio::spawn(async move {
260                let mut ticker = tokio::time::interval(Duration::from_secs(300));
261                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
262                loop {
263                    ticker.tick().await;
264                    store_clone.cleanup_expired().await;
265                }
266            });
267        }
268
269        let apq_enabled = config.apq_enabled;
270
271        Ok(Self {
272            config,
273            executor,
274            subscription_manager,
275            subscription_lifecycle: Arc::new(crate::subscriptions::NoopLifecycle),
276            max_subscriptions_per_connection: None,
277            oidc_validator,
278            rate_limiter,
279            #[cfg(feature = "secrets")]
280            secrets_manager: None,
281            #[cfg(feature = "federation")]
282            circuit_breaker,
283            error_sanitizer,
284            #[cfg(feature = "auth")]
285            state_encryption,
286            #[cfg(feature = "auth")]
287            pkce_store,
288            #[cfg(feature = "auth")]
289            oidc_server_client,
290            api_key_authenticator,
291            revocation_manager,
292            apq_store: if apq_enabled {
293                Some(Arc::new(fraiseql_core::apq::InMemoryApqStorage::default())
294                    as fraiseql_core::apq::ArcApqStorage)
295            } else {
296                None
297            },
298            trusted_docs,
299            #[cfg(feature = "mcp")]
300            mcp_config: None,
301            pool_tuning_config: None,
302            #[cfg(feature = "observers")]
303            observer_runtime,
304            #[cfg(feature = "observers")]
305            db_pool,
306            flight_service,
307        })
308    }
309
310    /// Initialize observer runtime from configuration
311    #[cfg(feature = "observers")]
312    pub(super) async fn init_observer_runtime(
313        config: &ServerConfig,
314        pool: Option<&sqlx::PgPool>,
315    ) -> Option<Arc<RwLock<ObserverRuntime>>> {
316        // Check if enabled
317        let observer_config = match &config.observers {
318            Some(cfg) if cfg.enabled => cfg,
319            _ => {
320                info!("Observer runtime disabled");
321                return None;
322            },
323        };
324
325        let Some(pool) = pool else {
326            warn!("No database pool provided for observers");
327            return None;
328        };
329
330        info!("Initializing observer runtime");
331
332        let runtime_config = ObserverRuntimeConfig::new(pool.clone())
333            .with_poll_interval(observer_config.poll_interval_ms)
334            .with_batch_size(observer_config.batch_size)
335            .with_channel_capacity(observer_config.channel_capacity);
336
337        let runtime = ObserverRuntime::new(runtime_config);
338        Some(Arc::new(RwLock::new(runtime)))
339    }
340}