Skip to main content

fraiseql_core/runtime/
mod.rs

1//! Runtime query executor - executes compiled queries.
2//!
3//! # Architecture
4//!
5//! The runtime loads a CompiledSchema and executes incoming GraphQL queries by:
6//! 1. Parsing the GraphQL query
7//! 2. Matching it to a compiled query template
8//! 3. Binding variables
9//! 4. Executing the pre-compiled SQL
10//! 5. Projecting JSONB results to GraphQL response
11//!
12//! # Key Concepts
13//!
14//! - **Zero runtime compilation**: All SQL is pre-compiled
15//! - **Pattern matching**: Match incoming query structure to templates
16//! - **Variable binding**: Safe parameter substitution
17//! - **Result projection**: JSONB → GraphQL JSON transformation
18//!
19//! # Example
20//!
21//! ```ignore
22//! use fraiseql_core::runtime::Executor;
23//! use fraiseql_core::schema::CompiledSchema;
24//!
25//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26//! // Load compiled schema
27//! let schema = CompiledSchema::from_json(include_str!("schema.compiled.json"))?;
28//!
29//! // Create executor (connects to database)
30//! let executor = Executor::new(schema, db_pool).await?;
31//!
32//! // Execute GraphQL query
33//! let query = r#"query { users { id name } }"#;
34//! let result = executor.execute(query, None).await?;
35//!
36//! println!("{}", result);
37//! # Ok(())
38//! # }
39//! ```
40
41mod aggregate_parser;
42mod aggregate_projector;
43pub mod aggregation;
44mod executor;
45pub mod field_filter;
46pub mod input_validator;
47pub mod jsonb_strategy;
48mod matcher;
49pub mod mutation_result;
50mod planner;
51mod projection;
52pub mod query_tracing;
53pub mod sql_logger;
54pub mod subscription;
55pub mod tenant_enforcer;
56pub mod window;
57mod window_parser;
58mod window_projector;
59
60use std::sync::Arc;
61
62pub use aggregate_parser::AggregateQueryParser;
63pub use aggregate_projector::AggregationProjector;
64pub use aggregation::{AggregationSql, AggregationSqlGenerator};
65pub use executor::Executor;
66pub use field_filter::{can_access_field, filter_fields};
67pub use jsonb_strategy::{JsonbOptimizationOptions, JsonbStrategy};
68pub use matcher::{QueryMatch, QueryMatcher};
69pub use planner::{ExecutionPlan, QueryPlanner};
70pub use projection::{FieldMapping, ProjectionMapper, ResultProjector};
71pub use query_tracing::{
72    QueryExecutionTrace, QueryPhaseSpan, QueryTraceBuilder, create_phase_span, create_query_span,
73};
74pub use sql_logger::{SqlOperation, SqlQueryLog, SqlQueryLogBuilder, create_sql_span};
75pub use subscription::{
76    ActiveSubscription, DeliveryResult, KafkaAdapter, KafkaConfig, KafkaMessage, SubscriptionError,
77    SubscriptionEvent, SubscriptionId, SubscriptionManager, SubscriptionOperation,
78    SubscriptionPayload, TransportAdapter, TransportManager, WebhookAdapter, WebhookConfig,
79    WebhookPayload, protocol,
80};
81pub use tenant_enforcer::TenantEnforcer;
82pub use window::{WindowSql, WindowSqlGenerator};
83pub use window_parser::WindowQueryParser;
84pub use window_projector::WindowProjector;
85
86use crate::security::{FieldFilter, FieldFilterConfig, RLSPolicy};
87
88/// Runtime configuration.
89pub struct RuntimeConfig {
90    /// Enable query plan caching.
91    pub cache_query_plans: bool,
92
93    /// Maximum query depth (prevents deeply nested queries).
94    pub max_query_depth: usize,
95
96    /// Maximum query complexity score.
97    pub max_query_complexity: usize,
98
99    /// Enable performance tracing.
100    pub enable_tracing: bool,
101
102    /// Optional field filter for access control.
103    /// When set, validates that users have required scopes to access fields.
104    pub field_filter: Option<FieldFilter>,
105
106    /// Optional row-level security (RLS) policy.
107    /// When set, evaluates access rules based on SecurityContext to determine
108    /// what rows a user can access (e.g., tenant isolation, owner-based access).
109    pub rls_policy: Option<Arc<dyn RLSPolicy>>,
110
111    /// Query timeout in milliseconds (0 = no timeout).
112    pub query_timeout_ms: u64,
113
114    /// JSONB field optimization strategy options
115    pub jsonb_optimization: JsonbOptimizationOptions,
116}
117
118impl std::fmt::Debug for RuntimeConfig {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("RuntimeConfig")
121            .field("cache_query_plans", &self.cache_query_plans)
122            .field("max_query_depth", &self.max_query_depth)
123            .field("max_query_complexity", &self.max_query_complexity)
124            .field("enable_tracing", &self.enable_tracing)
125            .field("field_filter", &self.field_filter.is_some())
126            .field("rls_policy", &self.rls_policy.is_some())
127            .field("query_timeout_ms", &self.query_timeout_ms)
128            .field("jsonb_optimization", &self.jsonb_optimization)
129            .finish()
130    }
131}
132
133impl Default for RuntimeConfig {
134    fn default() -> Self {
135        Self {
136            cache_query_plans:    true,
137            max_query_depth:      10,
138            max_query_complexity: 1000,
139            enable_tracing:       false,
140            field_filter:         None,
141            rls_policy:           None,
142            query_timeout_ms:     30_000, // 30 second default timeout
143            jsonb_optimization:   JsonbOptimizationOptions::default(),
144        }
145    }
146}
147
148impl RuntimeConfig {
149    /// Create a new runtime config with a field filter.
150    ///
151    /// # Example
152    ///
153    /// ```
154    /// use fraiseql_core::runtime::RuntimeConfig;
155    /// use fraiseql_core::security::FieldFilterConfig;
156    ///
157    /// let config = RuntimeConfig::default()
158    ///     .with_field_filter(
159    ///         FieldFilterConfig::new()
160    ///             .protect_field("User", "salary")
161    ///             .protect_field("User", "ssn")
162    ///     );
163    /// ```
164    #[must_use]
165    pub fn with_field_filter(mut self, config: FieldFilterConfig) -> Self {
166        self.field_filter = Some(FieldFilter::new(config));
167        self
168    }
169
170    /// Configure row-level security (RLS) policy for access control.
171    ///
172    /// When set, the executor will evaluate the RLS policy before executing queries,
173    /// applying WHERE clause filters based on the user's SecurityContext.
174    ///
175    /// # Example
176    ///
177    /// ```ignore
178    /// use fraiseql_core::runtime::RuntimeConfig;
179    /// use fraiseql_core::security::DefaultRLSPolicy;
180    /// use std::sync::Arc;
181    ///
182    /// let config = RuntimeConfig::default()
183    ///     .with_rls_policy(Arc::new(DefaultRLSPolicy::new()));
184    /// ```
185    #[must_use]
186    pub fn with_rls_policy(mut self, policy: Arc<dyn RLSPolicy>) -> Self {
187        self.rls_policy = Some(policy);
188        self
189    }
190}
191
192/// Execution context for query cancellation support.
193///
194/// This struct provides a mechanism for gracefully cancelling long-running queries
195/// via cancellation tokens, enabling proper cleanup and error reporting when:
196/// - A client connection closes
197/// - A user explicitly cancels a query
198/// - A system shutdown is initiated
199///
200/// # Example
201///
202/// ```ignore
203/// use fraiseql_core::runtime::ExecutionContext;
204/// use tokio_util::sync::CancellationToken;
205///
206/// let token = CancellationToken::new();
207/// let ctx = ExecutionContext::new("query-123".to_string(), token);
208///
209/// // Spawn a task that cancels after 5 seconds
210/// let cancel_token = ctx.cancellation_token().clone();
211/// tokio::spawn(async move {
212///     tokio::time::sleep(Duration::from_secs(5)).await;
213///     cancel_token.cancel();
214/// });
215///
216/// // Execute query with cancellation support
217/// let result = executor.execute_with_context(query, Some(&ctx)).await;
218/// ```
219#[derive(Debug, Clone)]
220pub struct ExecutionContext {
221    /// Unique identifier for tracking the query execution
222    query_id: String,
223
224    /// Cancellation token for gracefully stopping the query
225    /// When cancelled, ongoing query execution should stop and return a Cancelled error
226    token: tokio_util::sync::CancellationToken,
227}
228
229impl ExecutionContext {
230    /// Create a new execution context with a cancellation token.
231    ///
232    /// # Arguments
233    ///
234    /// * `query_id` - Unique identifier for this query execution
235    ///
236    /// # Example
237    ///
238    /// ```ignore
239    /// let ctx = ExecutionContext::new("user-query-001".to_string());
240    /// ```
241    #[must_use]
242    pub fn new(query_id: String) -> Self {
243        Self {
244            query_id,
245            token: tokio_util::sync::CancellationToken::new(),
246        }
247    }
248
249    /// Get the query ID.
250    #[must_use]
251    pub fn query_id(&self) -> &str {
252        &self.query_id
253    }
254
255    /// Get a reference to the cancellation token.
256    ///
257    /// The returned token can be used to:
258    /// - Clone and pass to background tasks
259    /// - Check if cancellation was requested
260    /// - Propagate cancellation through the call stack
261    #[must_use]
262    pub fn cancellation_token(&self) -> &tokio_util::sync::CancellationToken {
263        &self.token
264    }
265
266    /// Check if cancellation has been requested.
267    #[must_use]
268    pub fn is_cancelled(&self) -> bool {
269        self.token.is_cancelled()
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_default_config() {
279        let config = RuntimeConfig::default();
280        assert!(config.cache_query_plans);
281        assert_eq!(config.max_query_depth, 10);
282        assert_eq!(config.max_query_complexity, 1000);
283        assert!(!config.enable_tracing);
284    }
285}