Skip to main content

fraiseql_core/runtime/
mod.rs

1//! Runtime query executor - executes compiled queries.
2//!
3//! # Architecture
4//!
5//! The runtime loads a `CompiledSchema` and executes incoming GraphQL queries by:
6//! 1. Parsing the GraphQL query
7//! 2. Matching it to a compiled query template
8//! 3. Binding variables
9//! 4. Executing the pre-compiled SQL
10//! 5. Projecting JSONB results to GraphQL response
11//!
12//! # Key Concepts
13//!
14//! - **Zero runtime compilation**: All SQL is pre-compiled
15//! - **Pattern matching**: Match incoming query structure to templates
16//! - **Variable binding**: Safe parameter substitution
17//! - **Result projection**: JSONB → GraphQL JSON transformation
18//!
19//! # Example
20//!
21//! ```no_run
22//! // Requires: a compiled schema file and a live PostgreSQL database.
23//! // See: tests/integration/ for runnable examples.
24//! use fraiseql_core::runtime::Executor;
25//! use fraiseql_core::schema::CompiledSchema;
26//! use fraiseql_core::db::postgres::PostgresAdapter;
27//! use std::sync::Arc;
28//!
29//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
30//! # let schema_json = r#"{"types":[],"queries":[]}"#;
31//! // Load compiled schema
32//! let schema = CompiledSchema::from_json(schema_json)?;
33//!
34//! // Create executor with a concrete adapter implementation
35//! let adapter = Arc::new(PostgresAdapter::new("postgresql://localhost/mydb").await?);
36//! let executor = Executor::new(schema, adapter);
37//!
38//! // Execute GraphQL query
39//! let query = r#"query { users { id name } }"#;
40//! let result = executor.execute(query, None).await?;
41//!
42//! println!("{}", result);
43//! # Ok(())
44//! # }
45//! ```
46
47mod aggregate_parser;
48mod aggregate_projector;
49pub mod aggregation;
50mod executor;
51pub mod executor_adapter;
52mod explain;
53pub mod field_filter;
54pub mod input_validator;
55pub mod jsonb_strategy;
56mod matcher;
57pub mod mutation_result;
58mod planner;
59mod projection;
60pub mod query_tracing;
61pub mod relay;
62pub mod sql_logger;
63pub mod subscription;
64pub mod tenant_enforcer;
65pub mod window;
66mod window_parser;
67mod window_projector;
68
69use std::sync::Arc;
70
71pub use aggregate_parser::AggregateQueryParser;
72pub use aggregate_projector::AggregationProjector;
73pub use aggregation::{AggregationSqlGenerator, ParameterizedAggregationSql};
74pub use executor::{
75    Executor,
76    pipeline::{extract_root_field_names, is_multi_root, multi_root_queries_total},
77};
78pub use executor_adapter::ExecutorAdapter;
79pub use explain::{ExplainPlan, ExplainResult};
80pub use field_filter::{FieldAccessResult, can_access_field, classify_field_access, filter_fields};
81pub use jsonb_strategy::{JsonbOptimizationOptions, JsonbStrategy};
82pub use matcher::{QueryMatch, QueryMatcher, suggest_similar};
83pub use planner::{ExecutionPlan, QueryPlanner};
84pub use projection::{FieldMapping, ProjectionMapper, ResultProjector};
85pub use query_tracing::{
86    QueryExecutionTrace, QueryPhaseSpan, QueryTraceBuilder, create_phase_span, create_query_span,
87};
88pub use sql_logger::{SqlOperation, SqlQueryLog, SqlQueryLogBuilder, create_sql_span};
89pub use subscription::{
90    ActiveSubscription, DeliveryResult, KafkaAdapter, KafkaConfig, KafkaMessage, SubscriptionError,
91    SubscriptionEvent, SubscriptionId, SubscriptionManager, SubscriptionOperation,
92    SubscriptionPayload, TransportAdapter, TransportManager, WebhookAdapter, WebhookConfig,
93    WebhookPayload, extract_rls_conditions, protocol,
94};
95pub use tenant_enforcer::TenantEnforcer;
96
97/// Result of a bulk REST operation (collection-level PATCH/DELETE).
98#[derive(Debug, Clone)]
99pub struct BulkResult {
100    /// Number of rows affected.
101    pub affected_rows: u64,
102    /// Entities returned when `Prefer: return=representation` is set.
103    pub entities:      Option<Vec<serde_json::Value>>,
104}
105pub use window::{WindowSql, WindowSqlGenerator};
106pub use window_parser::WindowQueryParser;
107pub use window_projector::WindowProjector;
108
109use crate::security::{FieldFilter, FieldFilterConfig, QueryValidatorConfig, RLSPolicy};
110
111/// Runtime configuration for the FraiseQL query executor.
112///
113/// Controls safety limits, security policies, and performance tuning. All settings
114/// have production-safe defaults and can be overridden via the builder-style methods.
115///
116/// # Defaults
117///
118/// | Field | Default | Notes |
119/// |-------|---------|-------|
120/// | `cache_query_plans` | `true` | Caches parsed query plans for repeated queries |
121/// | `max_query_depth` | `10` | Prevents stack overflow on recursive GraphQL |
122/// | `max_query_complexity` | `1000` | Rough cost model; tune per workload |
123/// | `enable_tracing` | `false` | Emit `OpenTelemetry` spans for each query |
124/// | `query_timeout_ms` | `30 000` | Hard limit; 0 disables the timeout |
125/// | `field_filter` | `None` | No field-level access control |
126/// | `rls_policy` | `None` | No row-level security |
127///
128/// # Example
129///
130/// ```
131/// use fraiseql_core::runtime::RuntimeConfig;
132/// use fraiseql_core::security::FieldFilterConfig;
133///
134/// let config = RuntimeConfig {
135///     max_query_depth: 5,
136///     max_query_complexity: 500,
137///     enable_tracing: true,
138///     query_timeout_ms: 5_000,
139///     ..RuntimeConfig::default()
140/// }
141/// .with_field_filter(
142///     FieldFilterConfig::new()
143///         .protect_field("User", "salary")
144///         .protect_field("User", "ssn"),
145/// );
146/// ```
147pub struct RuntimeConfig {
148    /// Enable query plan caching.
149    pub cache_query_plans: bool,
150
151    /// Maximum query depth (prevents deeply nested queries).
152    pub max_query_depth: usize,
153
154    /// Maximum query complexity score.
155    pub max_query_complexity: usize,
156
157    /// Enable performance tracing.
158    pub enable_tracing: bool,
159
160    /// Optional field filter for access control.
161    /// When set, validates that users have required scopes to access fields.
162    pub field_filter: Option<FieldFilter>,
163
164    /// Optional row-level security (RLS) policy.
165    /// When set, evaluates access rules based on `SecurityContext` to determine
166    /// what rows a user can access (e.g., tenant isolation, owner-based access).
167    pub rls_policy: Option<Arc<dyn RLSPolicy>>,
168
169    /// Query timeout in milliseconds (0 = no timeout).
170    pub query_timeout_ms: u64,
171
172    /// JSONB field optimization strategy options
173    pub jsonb_optimization: JsonbOptimizationOptions,
174
175    /// Optional query validation config.
176    ///
177    /// When `Some`, `QueryValidator::validate()` runs at the start of every
178    /// `Executor::execute()` call, before any parsing or SQL dispatch.
179    /// This provides `DoS` protection for direct `fraiseql-core` embedders that
180    /// do not route through `fraiseql-server` (which already runs `RequestValidator`
181    /// at the HTTP layer). Enforces: query size, depth, complexity, and alias count
182    /// (alias amplification protection).
183    ///
184    /// Set `None` to disable (default) — useful when the caller applies
185    /// validation at a higher layer, or when `fraiseql-server` is in use.
186    pub query_validation: Option<QueryValidatorConfig>,
187}
188
189impl std::fmt::Debug for RuntimeConfig {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        f.debug_struct("RuntimeConfig")
192            .field("cache_query_plans", &self.cache_query_plans)
193            .field("max_query_depth", &self.max_query_depth)
194            .field("max_query_complexity", &self.max_query_complexity)
195            .field("enable_tracing", &self.enable_tracing)
196            .field("field_filter", &self.field_filter.is_some())
197            .field("rls_policy", &self.rls_policy.is_some())
198            .field("query_timeout_ms", &self.query_timeout_ms)
199            .field("jsonb_optimization", &self.jsonb_optimization)
200            .field("query_validation", &self.query_validation)
201            .finish()
202    }
203}
204
205impl Default for RuntimeConfig {
206    fn default() -> Self {
207        Self {
208            cache_query_plans:    true,
209            max_query_depth:      10,
210            max_query_complexity: 1000,
211            enable_tracing:       false,
212            field_filter:         None,
213            rls_policy:           None,
214            query_timeout_ms:     30_000, // 30 second default timeout
215            jsonb_optimization:   JsonbOptimizationOptions::default(),
216            query_validation:     None,
217        }
218    }
219}
220
221impl RuntimeConfig {
222    /// Create a new runtime config with a field filter.
223    ///
224    /// # Example
225    ///
226    /// ```
227    /// use fraiseql_core::runtime::RuntimeConfig;
228    /// use fraiseql_core::security::FieldFilterConfig;
229    ///
230    /// let config = RuntimeConfig::default()
231    ///     .with_field_filter(
232    ///         FieldFilterConfig::new()
233    ///             .protect_field("User", "salary")
234    ///             .protect_field("User", "ssn")
235    ///     );
236    /// ```
237    #[must_use]
238    pub fn with_field_filter(mut self, config: FieldFilterConfig) -> Self {
239        self.field_filter = Some(FieldFilter::new(config));
240        self
241    }
242
243    /// Configure row-level security (RLS) policy for access control.
244    ///
245    /// When set, the executor will evaluate the RLS policy before executing queries,
246    /// applying WHERE clause filters based on the user's `SecurityContext`.
247    ///
248    /// # Example
249    ///
250    /// ```rust
251    /// use fraiseql_core::runtime::RuntimeConfig;
252    /// use fraiseql_core::security::DefaultRLSPolicy;
253    /// use std::sync::Arc;
254    ///
255    /// let config = RuntimeConfig::default()
256    ///     .with_rls_policy(Arc::new(DefaultRLSPolicy::new()));
257    /// ```
258    #[must_use]
259    pub fn with_rls_policy(mut self, policy: Arc<dyn RLSPolicy>) -> Self {
260        self.rls_policy = Some(policy);
261        self
262    }
263}
264
265/// Execution context for query cancellation support.
266///
267/// This struct provides a mechanism for gracefully cancelling long-running queries
268/// via cancellation tokens, enabling proper cleanup and error reporting when:
269/// - A client connection closes
270/// - A user explicitly cancels a query
271/// - A system shutdown is initiated
272///
273/// # Example
274///
275/// ```no_run
276/// // Requires: a running tokio runtime and an Executor with a live database adapter.
277/// // See: tests/integration/ for runnable examples.
278/// use fraiseql_core::runtime::ExecutionContext;
279/// use std::time::Duration;
280///
281/// let ctx = ExecutionContext::new("query-123".to_string());
282///
283/// // Spawn a task that cancels after 5 seconds
284/// let cancel_token = ctx.cancellation_token().clone();
285/// tokio::spawn(async move {
286///     tokio::time::sleep(Duration::from_secs(5)).await;
287///     cancel_token.cancel();
288/// });
289///
290/// // Execute query with cancellation support
291/// // let result = executor.execute_with_context(query, None, &ctx).await;
292/// ```
293#[derive(Debug, Clone)]
294pub struct ExecutionContext {
295    /// Unique identifier for tracking the query execution
296    query_id: String,
297
298    /// Cancellation token for gracefully stopping the query
299    /// When cancelled, ongoing query execution should stop and return a Cancelled error
300    token: tokio_util::sync::CancellationToken,
301}
302
303impl ExecutionContext {
304    /// Create a new execution context with a cancellation token.
305    ///
306    /// # Arguments
307    ///
308    /// * `query_id` - Unique identifier for this query execution
309    ///
310    /// # Example
311    ///
312    /// ```rust
313    /// # use fraiseql_core::runtime::ExecutionContext;
314    /// let ctx = ExecutionContext::new("user-query-001".to_string());
315    /// assert_eq!(ctx.query_id(), "user-query-001");
316    /// ```
317    #[must_use]
318    pub fn new(query_id: String) -> Self {
319        Self {
320            query_id,
321            token: tokio_util::sync::CancellationToken::new(),
322        }
323    }
324
325    /// Get the query ID.
326    #[must_use]
327    pub fn query_id(&self) -> &str {
328        &self.query_id
329    }
330
331    /// Get a reference to the cancellation token.
332    ///
333    /// The returned token can be used to:
334    /// - Clone and pass to background tasks
335    /// - Check if cancellation was requested
336    /// - Propagate cancellation through the call stack
337    #[must_use]
338    pub const fn cancellation_token(&self) -> &tokio_util::sync::CancellationToken {
339        &self.token
340    }
341
342    /// Check if cancellation has been requested.
343    #[must_use]
344    pub fn is_cancelled(&self) -> bool {
345        self.token.is_cancelled()
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    #[test]
354    fn test_default_config() {
355        let config = RuntimeConfig::default();
356        assert!(config.cache_query_plans);
357        assert_eq!(config.max_query_depth, 10);
358        assert_eq!(config.max_query_complexity, 1000);
359        assert!(!config.enable_tracing);
360    }
361}