Skip to main content

sqry_core/query/executor/
core.rs

1//! Query executor core - main `QueryExecutor` struct and orchestration
2//!
3//! # Query Execution Pipeline
4//!
5//! The executor processes queries via `CodeGraph` in three stages:
6//!
7//! 1. **Parse with cache** (`parse_query_ast`):
8//!    - Check AST parse cache for `Arc<ParsedQuery>` (15-20ns cache hit)
9//!    - On miss: parse query (1.6µs) and cache `Arc<ParsedQuery>`
10//!    - Returns `Arc<ParsedQuery>` for zero-copy sharing
11//!
12//! 2. **Graph evaluation** (`execute_on_graph`):
13//!    - Load `CodeGraph` from `.sqry/graph/snapshot.sqry`
14//!    - Evaluate predicates directly on graph nodes
15//!    - Handle relation queries (callers, callees, etc.) via graph edges
16//!
17//! 3. **Results**:
18//!    - Return `QueryResults` with Arc-based accessors
19//!    - Results sorted by file location
20//!
21//! # Arc-based Parse Cache
22//!
23//! The AST parse cache stores `Arc<ParsedQuery>` to enable zero-copy cache hits.
24//! This provides:
25//!
26//! - **100× speedup** for cache hits (1.6µs → 16ns)
27//! - **Thread-safe sharing** of parsed queries
28//! - **Memory efficiency** (one `ParsedQuery` instance per unique query string)
29//!
30//! See [`AstParseCache`](crate::query::cache::AstParseCache) for benchmarks and details.
31
32use super::graph_eval;
33use crate::graph::unified::concurrent::CodeGraph;
34use crate::graph::unified::persistence::load_from_path;
35use crate::normalizer::MetadataNormalizer;
36use crate::plugin::PluginManager;
37use crate::query::cache::{CacheStats, ResultCache};
38use crate::query::pipeline::AggregationResult;
39use crate::query::plan::{CacheStatus, ExecutionStep, QueryPlan};
40use crate::query::results::{JoinResults, QueryOutput, QueryResults};
41use crate::query::types::{Expr, PipelineStage};
42use anyhow::{Result, anyhow};
43use parking_lot::RwLock;
44use std::collections::HashMap;
45use std::path::{Path, PathBuf};
46use std::sync::Arc;
47use std::time::Instant;
48
49/// Thread-safe cache for a loaded code graph keyed by its canonical path.
50///
51/// The Option is None when no graph has been loaded yet. Once loaded, it stores
52/// both the path the graph was loaded from and the graph itself.
53pub(crate) type GraphCache = Arc<RwLock<Option<(PathBuf, Arc<CodeGraph>)>>>;
54
55/// Executes queries against code using `CodeGraph`.
56///
57/// The executor loads a `CodeGraph` from disk and evaluates queries directly
58/// against graph nodes using `execute_on_graph()`.
59pub struct QueryExecutor {
60    pub(crate) plugin_manager: PluginManager,
61
62    /// Thread-safe cache for loaded `CodeGraph`
63    ///
64    /// See [`GraphCache`] type alias for details on the caching strategy.
65    pub(crate) graph_cache: GraphCache,
66
67    /// AST parse cache (query string → `ParsedQuery`) - Boolean parser
68    pub(crate) ast_parse_cache: Arc<crate::query::cache::AstParseCache>,
69
70    /// Result cache for query results
71    pub(crate) result_cache: Arc<ResultCache>,
72
73    /// Disable parallel query execution (for A/B performance testing)
74    pub(crate) disable_parallel: bool,
75
76    /// Validation options for query parsing
77    pub(crate) validation_options: crate::query::validator::ValidationOptions,
78
79    /// Pre-flight cost-gate configuration (per `B_cost_gate.md` §2 +
80    /// `00_contracts.md` §3.CC-2 + §3.CC-3). The gate fires inside
81    /// [`Self::execute_evaluate_with`] before `evaluate_all` so a
82    /// rejected query never enters `spawn_blocking`. Default is the
83    /// documented standalone-MCP / daemon-default config (per
84    /// `00_contracts.md` §3.CC-3 the two are byte-identical on a
85    /// fresh install). Daemon callers override via
86    /// [`Self::with_cost_gate_config`] from the per-workspace
87    /// `DaemonConfig`.
88    pub(crate) cost_gate_config: crate::query::cost_gate::CostGateConfig,
89}
90
91impl QueryExecutor {
92    /// Create a new query executor
93    #[must_use]
94    pub fn new() -> Self {
95        Self {
96            plugin_manager: PluginManager::new(),
97            graph_cache: Arc::new(RwLock::new(None)),
98            ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
99            result_cache: Arc::new(ResultCache::new(1000)),
100            disable_parallel: false,
101            validation_options: crate::query::validator::ValidationOptions::default(),
102            cost_gate_config: crate::query::cost_gate::CostGateConfig::default(),
103        }
104    }
105
106    /// Create a query executor with a custom plugin manager
107    #[must_use]
108    pub fn with_plugin_manager(plugin_manager: PluginManager) -> Self {
109        Self {
110            plugin_manager,
111            graph_cache: Arc::new(RwLock::new(None)),
112            ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
113            result_cache: Arc::new(ResultCache::new(1000)),
114            disable_parallel: false,
115            validation_options: crate::query::validator::ValidationOptions::default(),
116            cost_gate_config: crate::query::cost_gate::CostGateConfig::default(),
117        }
118    }
119
120    /// Return the plugin manager used by this executor.
121    #[must_use]
122    pub fn plugin_manager(&self) -> &PluginManager {
123        &self.plugin_manager
124    }
125
126    fn build_registry(&self) -> crate::query::registry::FieldRegistry {
127        let mut registry = crate::query::registry::FieldRegistry::with_core_fields();
128        for plugin in self.plugin_manager.plugins() {
129            let _collisions = registry.add_plugin_fields(plugin.fields());
130        }
131
132        let normalizer = MetadataNormalizer::new();
133        for (short_form, canonical) in normalizer.mappings() {
134            if registry.contains(canonical)
135                && let Some(canonical_field) = registry.get(canonical)
136            {
137                let short_field = crate::query::types::FieldDescriptor {
138                    name: short_form,
139                    field_type: canonical_field.field_type.clone(),
140                    operators: canonical_field.operators,
141                    indexed: canonical_field.indexed,
142                    doc: canonical_field.doc,
143                };
144                registry.add_field(short_field);
145            }
146        }
147
148        registry
149    }
150
151    /// Configure validation options (e.g., fuzzy field tolerance)
152    #[must_use]
153    pub fn with_validation_options(
154        mut self,
155        options: crate::query::validator::ValidationOptions,
156    ) -> Self {
157        self.validation_options = options;
158        self
159    }
160
161    /// Override the pre-flight cost-gate configuration (per
162    /// `B_cost_gate.md` §B6 + `00_contracts.md` §3.CC-3).
163    /// The daemon installs the per-workspace cap derived from
164    /// `DaemonConfig` here; standalone callers can lower the cap
165    /// for stricter rejection or `None` to disable arena-size
166    /// gating entirely (the shape-only checks still run).
167    #[must_use]
168    pub fn with_cost_gate_config(
169        mut self,
170        config: crate::query::cost_gate::CostGateConfig,
171    ) -> Self {
172        self.cost_gate_config = config;
173        self
174    }
175
176    /// Disable parallel query execution (for A/B performance testing)
177    #[must_use]
178    pub fn without_parallel(mut self) -> Self {
179        self.disable_parallel = true;
180        self
181    }
182
183    /// Get or load `CodeGraph` with thread-safe caching
184    ///
185    /// Uses double-checked locking pattern for thread-safe lazy initialization:
186    /// 1. Try cache with read lock - fast path (validates path matches)
187    /// 2. Load from disk with write lock - slow path with double-check
188    ///
189    /// # Path Tracking
190    /// - Cache stores (`PathBuf`, `Arc<CodeGraph>`) to track which directory was loaded
191    /// - If cached path != requested path, cache is invalidated and reloaded
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if loading the graph from disk fails.
196    pub(crate) fn get_or_load_graph(&self, dir: &Path) -> Result<Option<Arc<CodeGraph>>> {
197        // Canonicalize the path for consistent comparisons
198        let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
199
200        // Fast path: Try cache (read lock - allows concurrent reads)
201        {
202            let cache = self.graph_cache.read();
203            if let Some((cached_path, graph)) = cache.as_ref()
204                && cached_path == &canonical_dir
205            {
206                return Ok(Some(Arc::clone(graph)));
207            }
208            // Path mismatch - cache will be invalidated in slow path
209        }
210
211        // Slow path: Load and cache (write lock - exclusive access)
212        let mut cache = self.graph_cache.write();
213
214        // Double-check: another thread might have loaded while we waited for write lock
215        if let Some((cached_path, graph)) = cache.as_ref() {
216            if cached_path == &canonical_dir {
217                return Ok(Some(Arc::clone(graph)));
218            }
219            // Path mismatch - invalidate cache and reload below
220            log::debug!(
221                "Graph cache invalidated due to path mismatch. Old: {}, New: {}",
222                cached_path.display(),
223                canonical_dir.display()
224            );
225        }
226
227        // Actually load from disk
228        let storage = crate::graph::unified::persistence::GraphStorage::new(&canonical_dir);
229
230        if !storage.exists() {
231            // No manifest → no complete index
232            let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
233            if auto_index_var == "false" || auto_index_var == "0" {
234                *cache = None;
235                return Ok(None);
236            }
237
238            log::info!(
239                "No graph found at {}, auto-building index",
240                canonical_dir.display()
241            );
242            // Release write lock before the heavy build operation
243            drop(cache);
244
245            let config = crate::graph::unified::build::BuildConfig::default();
246            let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
247                &canonical_dir,
248                &self.plugin_manager,
249                &config,
250                "cli:auto_index",
251            )?;
252            let arc_graph = Arc::new(graph);
253
254            let mut cache = self.graph_cache.write();
255            *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
256            return Ok(Some(arc_graph));
257        }
258
259        // Manifest exists → try loading snapshot
260        log::debug!(
261            "Loading CodeGraph from: {}",
262            storage.snapshot_path().display()
263        );
264
265        match load_from_path(storage.snapshot_path(), Some(&self.plugin_manager)) {
266            Ok(graph) => {
267                let arc_graph = Arc::new(graph);
268                *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
269                Ok(Some(arc_graph))
270            }
271            Err(e) => {
272                // Load failed (snapshot missing/corrupt) → auto-rebuild if enabled
273                let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
274                if auto_index_var == "false" || auto_index_var == "0" {
275                    return Err(e.into());
276                }
277                log::warn!("Graph load failed ({e}), auto-rebuilding index");
278                // Release write lock before the heavy rebuild
279                drop(cache);
280
281                let config = crate::graph::unified::build::BuildConfig::default();
282                let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
283                    &canonical_dir,
284                    &self.plugin_manager,
285                    &config,
286                    "cli:auto_index",
287                )?;
288                let arc_graph = Arc::new(graph);
289
290                let mut cache = self.graph_cache.write();
291                *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
292                Ok(Some(arc_graph))
293            }
294        }
295    }
296
297    /// Get cache statistics (for monitoring/debugging)
298    #[must_use]
299    pub fn cache_stats(&self) -> (CacheStats, CacheStats) {
300        (self.ast_parse_cache.stats(), self.result_cache.stats())
301    }
302
303    /// Get query execution plan for --explain
304    ///
305    /// Parses the query and returns detailed execution plan with timing,
306    /// cache status, and optimization information.
307    ///
308    /// # Errors
309    ///
310    /// Returns [`anyhow::Error`] when query parsing, validation, or optimization fails.
311    pub fn get_query_plan(&self, query_str: &str) -> Result<QueryPlan> {
312        let start = Instant::now();
313
314        // Step 1: Parse query (boolean AST)
315        let parse_start = Instant::now();
316
317        let parsed = self.parse_query_ast(query_str)?;
318        let registry = self.build_registry();
319        let optimizer = crate::query::optimizer::Optimizer::new(registry);
320        let optimized_query = optimizer.optimize_query((*parsed.ast).clone());
321
322        let optimized_query_str = format!("{:?}", optimized_query.root);
323        let parse_step_name = "Parse query (boolean)";
324        let steps_prefix = vec![
325            (parse_step_name, 0),
326            ("Validate fields", 0),
327            ("Optimize AST", 0),
328        ];
329
330        // Duration beyond u64::MAX ms (~584 million years) is impossible; clamp to max
331        let parse_time = parse_start
332            .elapsed()
333            .as_millis()
334            .try_into()
335            .unwrap_or(u64::MAX);
336
337        // Get cache status
338        let (parse_stats, result_stats) = self.cache_stats();
339        let cache_status = CacheStatus {
340            parse_cache_hit: parse_stats.hits > 0,
341            result_cache_hit: result_stats.hits > 0,
342        };
343
344        // Build execution steps
345        let mut steps = Vec::new();
346        let mut step_num = 1;
347
348        for (operation, result_count) in steps_prefix {
349            steps.push(ExecutionStep {
350                step_num,
351                operation: operation.to_string(),
352                result_count,
353                time_ms: if step_num == 1 { parse_time } else { 0 },
354            });
355            step_num += 1;
356        }
357
358        // Add graph lookup step
359        steps.push(ExecutionStep {
360            step_num,
361            operation: "CodeGraph lookup".to_string(),
362            result_count: 0,
363            time_ms: 0,
364        });
365
366        // Duration beyond u64::MAX ms is impossible; clamp to max
367        let total_time = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
368
369        Ok(QueryPlan::new(
370            query_str.to_string(),
371            optimized_query_str,
372            steps,
373            total_time,
374            true, // Always uses CodeGraph
375            cache_status,
376        ))
377    }
378
379    /// Clear all caches (for testing)
380    #[cfg(test)]
381    pub fn clear_caches(&self) {
382        self.ast_parse_cache.clear();
383        self.result_cache.clear();
384    }
385
386    /// Parse query string using boolean AST parser with caching
387    ///
388    /// This method:
389    /// 1. Checks AST parse cache for existing `ParsedQuery`
390    /// 2. On cache miss: parses query, validates, extracts repo filter, normalizes
391    /// 3. Caches the `ParsedQuery` for future reuse
392    ///
393    /// # Arguments
394    ///
395    /// * `query_str` - Query string to parse (boolean syntax)
396    ///
397    /// # Returns
398    ///
399    /// * `Ok(Arc<ParsedQuery>)` - Cached or freshly parsed query
400    /// * `Err(...)` - Parse error, validation error, or repo filter error
401    ///
402    /// # Performance
403    ///
404    /// - Cache hit: ~15-20ns (Arc clone + hash lookup)
405    /// - Cache miss: ~1.6µs (lex + parse + validate + normalize)
406    ///
407    /// # Example
408    ///
409    /// ```ignore
410    /// let executor = QueryExecutor::new();
411    /// let parsed = executor.parse_query_ast("kind:function AND name:test")?;
412    /// // parsed.ast contains the boolean expression
413    /// // parsed.repo_filter contains any repo: predicates
414    /// // parsed.normalized is the cache key (repo predicates stripped)
415    /// ```
416    ///
417    /// # Errors
418    ///
419    /// Returns [`anyhow::Error`] when parsing, validation, or normalization fails.
420    pub fn parse_query_ast(&self, query_str: &str) -> Result<Arc<crate::query::ParsedQuery>> {
421        // Try cache first
422        if let Some(cached_parsed) = self.ast_parse_cache.get(query_str) {
423            log::trace!("AST parse cache HIT for: {query_str}");
424            return Ok(cached_parsed);
425        }
426
427        log::trace!("AST parse cache MISS, parsing: {query_str}");
428
429        // Parse query using boolean parser
430        let ast = crate::query::parser_new::Parser::parse_query(query_str)
431            .map_err(|err| err.with_source(query_str))?;
432
433        let registry = self.build_registry();
434
435        // Validate AST against registry (with normalization)
436        let validator =
437            crate::query::validator::Validator::with_options(registry, self.validation_options);
438        let mut normalized_ast = ast.clone();
439        normalized_ast.root = match validator.normalize_expr(&ast.root) {
440            Ok(root) => root,
441            Err(validation_err) => {
442                // Wrap normalization error with source context for rich diagnostics
443                let query_error = crate::query::error::QueryError::Validation(validation_err);
444                return Err(query_error.with_source(query_str).into());
445            }
446        };
447        if let Err(validation_err) = validator.validate(&normalized_ast.root) {
448            // Wrap validation error with source context for rich diagnostics
449            let query_error = crate::query::error::QueryError::Validation(validation_err);
450            return Err(query_error.with_source(query_str).into());
451        }
452
453        // Create ParsedQuery (extracts repo filter, normalizes AST)
454        let parsed = crate::query::ParsedQuery::from_ast(Arc::new(normalized_ast))?;
455
456        // Cache the ParsedQuery for future reuse
457        let arc_parsed = Arc::new(parsed);
458        self.ast_parse_cache
459            .insert_arc(query_str.to_string(), Arc::clone(&arc_parsed));
460
461        Ok(arc_parsed)
462    }
463}
464
465impl Default for QueryExecutor {
466    fn default() -> Self {
467        Self::new()
468    }
469}
470
471impl QueryExecutor {
472    /// Execute query using `CodeGraph`.
473    ///
474    /// Loads the `CodeGraph` from disk (or cache) and evaluates the query
475    /// directly against graph nodes.
476    ///
477    /// # Arguments
478    ///
479    /// * `query` - The query string to parse and execute
480    /// * `path` - Directory path containing the `.sqry/graph/snapshot.sqry` file
481    ///
482    /// # Returns
483    ///
484    /// `QueryResults` containing matched `NodeId`s with Arc-based accessors.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if:
489    /// - No graph exists at the path (run `sqry index` first)
490    /// - Query parsing fails
491    /// - Predicate evaluation fails
492    ///
493    /// # Example
494    ///
495    /// ```ignore
496    /// let executor = QueryExecutor::new();
497    /// let results = executor.execute_on_graph("kind:function", Path::new("/my/project"))?;
498    /// for m in results.iter() {
499    ///     println!("{}: {}", m.kind().as_str(), m.name().unwrap_or_default());
500    /// }
501    /// ```
502    pub fn execute_on_graph(&self, query: &str, path: &Path) -> Result<QueryResults> {
503        self.execute_on_graph_with_variables(query, path, None)
504    }
505
506    /// Cancellable variant of [`Self::execute_on_graph`].
507    ///
508    /// The supplied `cancel` token is polled by the inner evaluator
509    /// (per-batch in `evaluate_all`, per-iteration in the rayon path
510    /// — see `A_cancellation.md` §3 + `00_contracts.md` §3.CC-1). The
511    /// non-cancellable variant is preserved for LSP/CLI call sites
512    /// that have no per-request token to plumb.
513    ///
514    /// # Errors
515    ///
516    /// As [`Self::execute_on_graph`], plus [`crate::query::error::QueryError::Cancelled`]
517    /// (wrapped in `anyhow::Error`) when the token is observed cancelled
518    /// before the evaluator returns its match list.
519    pub fn execute_on_graph_cancellable(
520        &self,
521        query: &str,
522        path: &Path,
523        cancel: &crate::query::cancellation::CancellationToken,
524    ) -> Result<QueryResults> {
525        self.execute_on_graph_with_variables_cancellable(query, path, None, cancel)
526    }
527
528    /// Execute query with variable substitution.
529    ///
530    /// Variables in the query (e.g., `$type`) are replaced with values from
531    /// the provided map before evaluation.
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if graph loading, query parsing, variable resolution,
536    /// or predicate evaluation fails.
537    pub fn execute_on_graph_with_variables(
538        &self,
539        query: &str,
540        path: &Path,
541        variables: Option<&HashMap<String, String>>,
542    ) -> Result<QueryResults> {
543        // Backward-compatible: callers that don't supply a token get a
544        // never-cancelled one. Cost is one Arc<AtomicBool> alloc per
545        // call, O(1) — see `A_cancellation.md` §3.
546        self.execute_on_graph_with_variables_cancellable(
547            query,
548            path,
549            variables,
550            &crate::query::cancellation::CancellationToken::new(),
551        )
552    }
553
554    /// Cancellable variant of [`Self::execute_on_graph_with_variables`].
555    /// See [`Self::execute_on_graph_cancellable`] for the cancellation
556    /// contract.
557    ///
558    /// # Errors
559    ///
560    /// As [`Self::execute_on_graph_with_variables`], plus
561    /// [`crate::query::error::QueryError::Cancelled`] (wrapped in
562    /// `anyhow::Error`).
563    pub fn execute_on_graph_with_variables_cancellable(
564        &self,
565        query: &str,
566        path: &Path,
567        variables: Option<&HashMap<String, String>>,
568        cancel: &crate::query::cancellation::CancellationToken,
569    ) -> Result<QueryResults> {
570        let budget = crate::query::budget::QueryBudget::unbounded(cancel.clone());
571        let parsed = self.parse_query_ast(query)?;
572        let graph = self
573            .get_or_load_graph(path)?
574            .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
575
576        let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
577
578        self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, &budget)
579    }
580
581    /// Cancellable + budgeted variant of
582    /// [`Self::execute_on_graph_with_variables`]. See
583    /// [`Self::execute_on_preloaded_graph_with_budget`] for the
584    /// budget contract.
585    ///
586    /// # Errors
587    ///
588    /// As [`Self::execute_on_graph_with_variables`], plus
589    /// [`crate::query::budget::BudgetExceeded`] when the row
590    /// budget is exceeded.
591    pub fn execute_on_graph_with_variables_with_budget(
592        &self,
593        query: &str,
594        path: &Path,
595        variables: Option<&HashMap<String, String>>,
596        budget: &crate::query::budget::QueryBudget,
597    ) -> Result<QueryResults> {
598        let parsed = self.parse_query_ast(query)?;
599        let graph = self
600            .get_or_load_graph(path)?
601            .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
602
603        let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
604
605        self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, budget)
606    }
607
608    /// Execute a query against a caller-supplied `CodeGraph`, bypassing the
609    /// graph cache and on-disk loading entirely.
610    ///
611    /// This is the daemon hot-path: the caller already holds the
612    /// workspace-loaded `Arc<CodeGraph>` under a workspace lock, and we must
613    /// NOT re-enter [`Self::get_or_load_graph`] (which would hit disk and
614    /// pollute the executor's process-wide `graph_cache`). Query parsing still
615    /// goes through the AST parse cache; only the graph acquisition path is
616    /// skipped.
617    ///
618    /// The caller-supplied `workspace_root` is canonicalized by this method
619    /// (mirroring [`Self::execute_on_graph_with_variables`]); the caller need
620    /// not pre-canonicalize it. Passing the workspace root is the caller's
621    /// responsibility — it is not derived from the graph.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if query parsing, variable resolution, or predicate
626    /// evaluation fails. Unlike [`Self::execute_on_graph_with_variables`],
627    /// this method cannot produce a "no graph found" error — the graph is
628    /// always supplied by the caller.
629    pub fn execute_on_preloaded_graph(
630        &self,
631        graph: Arc<CodeGraph>,
632        query: &str,
633        workspace_root: &Path,
634        variables: Option<&HashMap<String, String>>,
635    ) -> Result<QueryResults> {
636        // Backward-compatible: callers that don't supply a token get a
637        // never-cancelled one. Cost is one Arc<AtomicBool> alloc per
638        // call, O(1) — see `A_cancellation.md` §3.
639        self.execute_on_preloaded_graph_cancellable(
640            graph,
641            query,
642            workspace_root,
643            variables,
644            &crate::query::cancellation::CancellationToken::new(),
645        )
646    }
647
648    /// Cancellable variant of [`Self::execute_on_preloaded_graph`].
649    /// See [`Self::execute_on_graph_cancellable`] for the cancellation
650    /// contract; this is the daemon hot-path entrypoint.
651    ///
652    /// # Errors
653    ///
654    /// As [`Self::execute_on_preloaded_graph`], plus
655    /// [`crate::query::error::QueryError::Cancelled`] (wrapped in
656    /// `anyhow::Error`).
657    pub fn execute_on_preloaded_graph_cancellable(
658        &self,
659        graph: Arc<CodeGraph>,
660        query: &str,
661        workspace_root: &Path,
662        variables: Option<&HashMap<String, String>>,
663        cancel: &crate::query::cancellation::CancellationToken,
664    ) -> Result<QueryResults> {
665        // The cancellable variant uses an unbounded budget — only
666        // the cooperative cancellation token is in play. Per-tool
667        // budget callers should use
668        // [`Self::execute_on_preloaded_graph_with_budget`] instead.
669        let budget = crate::query::budget::QueryBudget::unbounded(cancel.clone());
670        let parsed = self.parse_query_ast(query)?;
671        let workspace_root = workspace_root
672            .canonicalize()
673            .unwrap_or_else(|_| workspace_root.to_path_buf());
674
675        self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, &budget)
676    }
677
678    /// Cancellable + budgeted variant of
679    /// [`Self::execute_on_preloaded_graph`]. Plumbs both the
680    /// per-request cancellation token AND the per-tool runtime row
681    /// budget through to the evaluator so the budget's `tick()`
682    /// hook fires inside `evaluate_all` (per `C_budget.md` §3 +
683    /// `00_contracts.md` §3.CC-2).
684    ///
685    /// `budget.cancel` MUST be the same token the wrapper holds for
686    /// deadline-driven cancellation — `QueryBudget` enforces this
687    /// at construction time via [`crate::query::budget::QueryBudget::new`].
688    ///
689    /// # Errors
690    ///
691    /// As [`Self::execute_on_preloaded_graph`], plus
692    /// [`crate::query::budget::BudgetExceeded`] (wrapped in
693    /// `anyhow::Error`) when `budget.examined` reaches `budget.max_rows`.
694    pub fn execute_on_preloaded_graph_with_budget(
695        &self,
696        graph: Arc<CodeGraph>,
697        query: &str,
698        workspace_root: &Path,
699        variables: Option<&HashMap<String, String>>,
700        budget: &crate::query::budget::QueryBudget,
701    ) -> Result<QueryResults> {
702        let parsed = self.parse_query_ast(query)?;
703        let workspace_root = workspace_root
704            .canonicalize()
705            .unwrap_or_else(|_| workspace_root.to_path_buf());
706
707        self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, budget)
708    }
709
710    /// Shared evaluation body for `execute_on_graph_with_variables` and
711    /// `execute_on_preloaded_graph` (and their cancellable overloads).
712    ///
713    /// All public entrypoints differ only in how they obtain the
714    /// `Arc<CodeGraph>`; everything from variable resolution through
715    /// `evaluate_all` + result assembly is identical and lives here to
716    /// guarantee matching semantics.
717    ///
718    /// The `cancel` token is threaded into [`graph_eval::GraphEvalContext`]
719    /// so the inner [`graph_eval::evaluate_all`] hot loop polls it
720    /// cooperatively (per `A_cancellation.md` §3 + `00_contracts.md`
721    /// §3.CC-1). Non-cancellable callers pass a freshly-allocated
722    /// never-cancelled token; the inner loop's `is_cancelled()` poll
723    /// is one cache-warm atomic load per batch and never observes a
724    /// flipped flag in that case.
725    fn execute_evaluate_with(
726        &self,
727        graph: Arc<CodeGraph>,
728        parsed: &crate::query::ParsedQuery,
729        workspace_root: &Path,
730        variables: Option<&HashMap<String, String>>,
731        budget: &crate::query::budget::QueryBudget,
732    ) -> Result<QueryResults> {
733        // Resolve variables if provided. The cost gate inspects the
734        // post-substitution AST shape so a `$var` resolved to
735        // `.*foo.*` is classified as the literal regex, not as a
736        // `Variable` (per `B_cost_gate.md` §2 "Designed shared body").
737        let effective_root = if let Some(vars) = variables {
738            crate::query::types::resolve_variables(&parsed.ast.root, vars)
739                .map_err(|e| anyhow!("Variable resolution error: {e}"))?
740        } else {
741            parsed.ast.root.clone()
742        };
743
744        // Pre-flight cost gate (P0-1 mitigation). Single-point
745        // insertion at the shared body covers all 28 callers across
746        // `execute_on_preloaded_graph` (17), `execute_on_graph` (7),
747        // and `execute_on_graph_with_variables` (4) by construction
748        // — including the previously-missed MCP `hierarchical_search`
749        // path. Runs BEFORE `GraphEvalContext::new` / `evaluate_all`
750        // so the executor never starts a rayon scan on a rejected
751        // query, freeing the spawn_blocking pool from broad-shape
752        // load. See `B_cost_gate.md` §2 + `00_contracts.md` §3.CC-2.
753        crate::query::cost_gate::check_query(
754            &effective_root,
755            graph.node_count(),
756            &self.cost_gate_config,
757        )
758        .map_err(anyhow::Error::from)?;
759
760        // Threading both cancel + budget into the evaluator: the
761        // budget owns the cancellation token internally (per
762        // `C_budget.md` §3 + `00_contracts.md` §3.CC-1), so
763        // `with_budget` installs both atomically.
764        let mut ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
765            .with_workspace_root(workspace_root)
766            .with_parallel_disabled(self.disable_parallel)
767            .with_budget(budget.clone());
768
769        let matches = graph_eval::evaluate_all(&mut ctx, &effective_root)?;
770        let mut results =
771            QueryResults::new(graph, matches).with_workspace_root(workspace_root.to_path_buf());
772        results.sort_by_location();
773        Ok(results)
774    }
775
776    /// Execute a join query, returning matched node pairs.
777    ///
778    /// The query must have a `Join` expression at its root level.
779    ///
780    /// # Errors
781    ///
782    /// Returns an error if graph loading, query parsing, or join evaluation fails.
783    pub fn execute_join(
784        &self,
785        query: &str,
786        path: &Path,
787        variables: Option<&HashMap<String, String>>,
788    ) -> Result<JoinResults> {
789        let parsed = self.parse_query_ast(query)?;
790        let graph = self
791            .get_or_load_graph(path)?
792            .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
793
794        let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
795
796        // Resolve variables if provided
797        let effective_root = if let Some(vars) = variables {
798            crate::query::types::resolve_variables(&parsed.ast.root, vars)
799                .map_err(|e| anyhow!("Variable resolution error: {e}"))?
800        } else {
801            parsed.ast.root.clone()
802        };
803
804        let Expr::Join(join) = &effective_root else {
805            return Err(anyhow!(
806                "Expected a join expression (e.g., `(kind:function) CALLS (kind:function)`)"
807            ));
808        };
809
810        let ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
811            .with_workspace_root(&workspace_root)
812            .with_parallel_disabled(self.disable_parallel);
813
814        let eval_result = graph_eval::evaluate_join(&ctx, join, None)?;
815        let results = JoinResults::new(
816            graph,
817            eval_result.pairs,
818            join.edge.clone(),
819            eval_result.truncated,
820        )
821        .with_workspace_root(workspace_root);
822        Ok(results)
823    }
824
825    /// Execute a pipeline query (base query + aggregation stages).
826    ///
827    /// # Errors
828    ///
829    /// Returns an error if graph loading, query parsing, or pipeline execution fails.
830    pub fn execute_pipeline(
831        &self,
832        query: &str,
833        stages: &[PipelineStage],
834        path: &Path,
835        variables: Option<&HashMap<String, String>>,
836    ) -> Result<Vec<AggregationResult>> {
837        let results = self.execute_on_graph_with_variables(query, path, variables)?;
838
839        let mut aggregations = Vec::new();
840        for stage in stages {
841            aggregations.push(super::pipeline::execute_pipeline_stage(&results, stage));
842        }
843        Ok(aggregations)
844    }
845
846    /// Execute a full query that may be a regular query, join, or pipeline.
847    ///
848    /// Detects the query type and dispatches to the appropriate executor.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if graph loading, query parsing, or execution fails.
853    pub fn execute_full(
854        &self,
855        query: &str,
856        path: &Path,
857        variables: Option<&HashMap<String, String>>,
858    ) -> Result<QueryOutput> {
859        let parsed = self.parse_query_ast(query)?;
860
861        // Check for join at the root level
862        if matches!(&parsed.ast.root, Expr::Join(_)) {
863            let join_results = self.execute_join(query, path, variables)?;
864            return Ok(QueryOutput::Join(join_results));
865        }
866
867        // Check for pipeline
868        if let Some(pipeline) = crate::query::parser_new::Parser::parse_pipeline_query(query)
869            .map_err(|err| err.with_source(query))?
870        {
871            let aggregations = self.execute_pipeline(query, &pipeline.stages, path, variables)?;
872            // Return the last aggregation result (chained stages reduce to final result)
873            if let Some(last) = aggregations.into_iter().last() {
874                return Ok(QueryOutput::Aggregation(last));
875            }
876        }
877
878        // Regular query
879        let results = self.execute_on_graph_with_variables(query, path, variables)?;
880        Ok(QueryOutput::Results(results))
881    }
882}