Skip to main content

fraiseql_core/runtime/executor/
core.rs

1//! `Executor<A>` struct definition, constructors, and basic accessors.
2
3use std::{collections::HashMap, sync::Arc};
4
5use moka::sync::Cache as MokaCache;
6
7use super::{
8    QueryType,
9    relay::{RelayDispatch, RelayDispatchImpl},
10};
11use crate::{
12    db::{RelayDatabaseAdapter, traits::DatabaseAdapter, types::PoolMetrics},
13    graphql::ParsedQuery,
14    runtime::{QueryMatcher, QueryPlanner, RuntimeConfig},
15    schema::{CompiledSchema, IntrospectionResponses},
16};
17
18/// Maximum number of distinct query strings whose parsed ASTs are cached in memory.
19///
20/// 1 024 entries covers the full distinct-query vocabulary of any realistic workload.
21/// Each entry holds an `Arc<(QueryType, Option<ParsedQuery>)>` — the AST is shared,
22/// not duplicated.
23const PARSE_CACHE_CAPACITY: u64 = 1_024;
24
25/// Query executor - executes compiled GraphQL queries.
26///
27/// This is the main entry point for runtime query execution.
28/// It coordinates matching, planning, execution, and projection.
29///
30/// # Type Parameters
31///
32/// * `A` - The database adapter type (implements `DatabaseAdapter` trait)
33///
34/// # Ownership and Lifetimes
35///
36/// The executor holds owned references to schema and runtime data, with no borrowed pointers:
37/// - `schema`: Owned `CompiledSchema` (immutable after construction)
38/// - `adapter`: Shared via `Arc<A>` to allow multiple executors/tasks to use the same connection
39///   pool
40/// - `introspection`: Owned cached GraphQL schema responses
41/// - `config`: Owned runtime configuration
42///
43/// **No explicit lifetimes required** - all data is either owned or wrapped in `Arc`,
44/// so the executor can be stored in long-lived structures without lifetime annotations or
45/// borrow-checker issues.
46///
47/// # Concurrency
48///
49/// `Executor<A>` is `Send + Sync` when `A` is `Send + Sync`. It can be safely shared across
50/// threads and tasks without cloning:
51/// ```no_run
52/// // Requires: a live database adapter.
53/// // See: tests/integration/ for runnable examples.
54/// # use std::sync::Arc;
55/// // let executor = Arc::new(Executor::new(schema, adapter));
56/// // Can be cloned into multiple tasks
57/// // let exec_clone = executor.clone();
58/// // tokio::spawn(async move {
59/// //     let result = exec_clone.execute(query, vars).await;
60/// // });
61/// ```
62///
63/// # Query Timeout
64///
65/// Queries are protected by the `query_timeout_ms` configuration in `RuntimeConfig` (default: 30s).
66/// When a query exceeds this timeout, it returns `FraiseQLError::Timeout` without panicking.
67/// Set `query_timeout_ms` to 0 to disable timeout enforcement.
68pub struct Executor<A: DatabaseAdapter> {
69    /// Compiled schema with optimized SQL templates
70    pub(super) schema: CompiledSchema,
71
72    /// Shared database adapter for query execution
73    /// Wrapped in Arc to allow multiple executors to use the same connection pool
74    pub(super) adapter: Arc<A>,
75
76    /// Type-erased relay capability slot.
77    ///
78    /// `Some` when the executor was constructed via `new_with_relay` (requires
79    /// `A: RelayDatabaseAdapter`).  `None` causes relay queries to return a
80    /// `FraiseQLError::Validation` — no `unreachable!()`, no capability flag.
81    pub(super) relay: Option<Arc<dyn RelayDispatch>>,
82
83    /// Query matching engine (stateless)
84    pub(super) matcher: QueryMatcher,
85
86    /// Query execution planner (stateless)
87    pub(super) planner: QueryPlanner,
88
89    /// Runtime configuration (timeouts, complexity limits, etc.)
90    pub(super) config: RuntimeConfig,
91
92    /// Pre-built introspection responses cached for `__schema` and `__type` queries
93    /// Avoids recomputing schema introspection on every request
94    pub(super) introspection: IntrospectionResponses,
95
96    /// O(1) lookup index for Relay `node(id)` queries: maps `return_type → sql_source`.
97    ///
98    /// Built once at `Executor::with_config()` from the compiled schema, so every
99    /// `execute_node_query()` call is a single `HashMap::get()` rather than an O(N)
100    /// linear scan over `schema.queries`.
101    pub(super) node_type_index: HashMap<String, Arc<str>>,
102
103    /// Parsed GraphQL AST cache, keyed by xxHash64 of the query string.
104    ///
105    /// Repeated identical queries skip re-parsing entirely — a lock-free moka hit
106    /// instead of a full lexer + recursive-descent parse.  No TTL: parsed ASTs are
107    /// immutable and deterministic; the same query string always produces the same result.
108    /// Only successful parses are stored; errors are never cached.
109    pub(super) parse_cache: MokaCache<u64, Arc<(QueryType, Option<ParsedQuery>)>>,
110
111    /// Optional executor-level response cache.
112    ///
113    /// When enabled, caches the final projected GraphQL response (after RBAC,
114    /// projection, envelope wrapping) to skip all redundant work on cache hits.
115    /// Keyed by `(query_cache_key, security_context_hash)`.
116    pub(super) response_cache: Option<Arc<crate::cache::ResponseCache>>,
117}
118
119impl<A: DatabaseAdapter> Executor<A> {
120    /// Create new executor.
121    ///
122    /// # Arguments
123    ///
124    /// * `schema` - Compiled schema
125    /// * `adapter` - Database adapter
126    ///
127    /// # Example
128    ///
129    /// ```no_run
130    /// // Requires: a live PostgreSQL database.
131    /// // See: tests/integration/ for runnable examples.
132    /// # use fraiseql_core::schema::CompiledSchema;
133    /// # use fraiseql_core::db::postgres::PostgresAdapter;
134    /// # use fraiseql_core::runtime::Executor;
135    /// # use std::sync::Arc;
136    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
137    /// # let schema_json = r#"{"types":[],"queries":[]}"#;
138    /// # let connection_string = "postgresql://localhost/mydb";
139    /// let schema = CompiledSchema::from_json(schema_json)?;
140    /// let adapter = PostgresAdapter::new(connection_string).await?;
141    /// let executor = Executor::new(schema, Arc::new(adapter));
142    /// # Ok(()) }
143    /// ```
144    #[must_use]
145    pub fn new(schema: CompiledSchema, adapter: Arc<A>) -> Self {
146        Self::with_config(schema, adapter, RuntimeConfig::default())
147    }
148
149    /// Create new executor with custom configuration.
150    ///
151    /// # Arguments
152    ///
153    /// * `schema` - Compiled schema
154    /// * `adapter` - Database adapter
155    /// * `config` - Runtime configuration
156    #[must_use]
157    pub fn with_config(schema: CompiledSchema, adapter: Arc<A>, config: RuntimeConfig) -> Self {
158        let matcher = QueryMatcher::new(schema.clone());
159        let planner = QueryPlanner::new(config.cache_query_plans);
160        // Build introspection responses at startup (zero-cost at runtime)
161        let introspection = IntrospectionResponses::build(&schema);
162
163        // Build O(1) node-type index: return_type → sql_source.
164        // The first query with a matching return_type and a non-None sql_source wins
165        // (consistent with the previous linear-scan behaviour).
166        let mut node_type_index: HashMap<String, Arc<str>> = HashMap::new();
167        for q in &schema.queries {
168            if let Some(src) = q.sql_source.as_deref() {
169                node_type_index.entry(q.return_type.clone()).or_insert_with(|| Arc::from(src));
170            }
171        }
172
173        Self {
174            schema,
175            adapter,
176            relay: None,
177            matcher,
178            planner,
179            config,
180            introspection,
181            node_type_index,
182            parse_cache: MokaCache::new(PARSE_CACHE_CAPACITY),
183            response_cache: None,
184        }
185    }
186
187    /// Return current connection pool metrics from the underlying database adapter.
188    ///
189    /// Values are sampled live on each call — not cached — so callers (e.g., the
190    /// `/metrics` endpoint) always observe up-to-date pool health.
191    pub fn pool_metrics(&self) -> PoolMetrics {
192        self.adapter.pool_metrics()
193    }
194
195    /// Get the compiled schema.
196    #[must_use]
197    pub const fn schema(&self) -> &CompiledSchema {
198        &self.schema
199    }
200
201    /// Get runtime configuration.
202    #[must_use]
203    pub const fn config(&self) -> &RuntimeConfig {
204        &self.config
205    }
206
207    /// Get database adapter reference.
208    #[must_use]
209    pub const fn adapter(&self) -> &Arc<A> {
210        &self.adapter
211    }
212
213    /// Return the number of entries currently held in the parsed-query AST cache.
214    ///
215    /// Exposed for testing only — callers outside `#[cfg(test)]` code should not
216    /// rely on the exact count, which may lag by one maintenance cycle in moka.
217    #[cfg(test)]
218    #[must_use]
219    pub fn parse_cache_entry_count(&self) -> u64 {
220        self.parse_cache.entry_count()
221    }
222
223    /// Attach an executor-level response cache.
224    ///
225    /// When enabled, the executor caches the final projected response
226    /// (after RBAC, projection, and envelope wrapping) to skip all
227    /// redundant work on cache hits.
228    #[must_use]
229    pub fn with_response_cache(mut self, cache: Arc<crate::cache::ResponseCache>) -> Self {
230        self.response_cache = Some(cache);
231        self
232    }
233
234    /// Get response cache reference (if configured).
235    #[must_use]
236    pub const fn response_cache(&self) -> Option<&Arc<crate::cache::ResponseCache>> {
237        self.response_cache.as_ref()
238    }
239}
240
241impl<A: DatabaseAdapter + RelayDatabaseAdapter + 'static> Executor<A> {
242    /// Create a new executor with relay cursor pagination enabled.
243    ///
244    /// Only callable when `A: RelayDatabaseAdapter`.  The relay capability is
245    /// encoded once at construction time as a type-erased `Arc<dyn RelayDispatch>`,
246    /// so there is no per-query overhead beyond an `Option::is_some()` check.
247    ///
248    /// # Example
249    ///
250    /// ```no_run
251    /// // Requires: a live PostgreSQL database with relay support.
252    /// // See: tests/integration/ for runnable examples.
253    /// # use fraiseql_core::schema::CompiledSchema;
254    /// # use fraiseql_core::db::postgres::PostgresAdapter;
255    /// # use fraiseql_core::runtime::Executor;
256    /// # use std::sync::Arc;
257    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
258    /// # let connection_string = "postgresql://localhost/mydb";
259    /// # let schema: CompiledSchema = panic!("example");
260    /// let adapter = PostgresAdapter::new(connection_string).await?;
261    /// let executor = Executor::new_with_relay(schema, Arc::new(adapter));
262    /// # Ok(()) }
263    /// ```
264    #[must_use]
265    pub fn new_with_relay(schema: CompiledSchema, adapter: Arc<A>) -> Self {
266        Self::with_config_and_relay(schema, adapter, RuntimeConfig::default())
267    }
268
269    /// Create a new executor with relay support and custom configuration.
270    #[must_use]
271    pub fn with_config_and_relay(
272        schema: CompiledSchema,
273        adapter: Arc<A>,
274        config: RuntimeConfig,
275    ) -> Self {
276        let relay: Arc<dyn RelayDispatch> = Arc::new(RelayDispatchImpl(adapter.clone()));
277        let matcher = QueryMatcher::new(schema.clone());
278        let planner = QueryPlanner::new(config.cache_query_plans);
279        let introspection = IntrospectionResponses::build(&schema);
280
281        let mut node_type_index: HashMap<String, Arc<str>> = HashMap::new();
282        for q in &schema.queries {
283            if let Some(src) = q.sql_source.as_deref() {
284                node_type_index.entry(q.return_type.clone()).or_insert_with(|| Arc::from(src));
285            }
286        }
287
288        Self {
289            schema,
290            adapter,
291            relay: Some(relay),
292            matcher,
293            planner,
294            config,
295            introspection,
296            node_type_index,
297            parse_cache: MokaCache::new(PARSE_CACHE_CAPACITY),
298            response_cache: None,
299        }
300    }
301}