sqry_core/query/executor/core.rs
1//! Query executor core - main `QueryExecutor` struct and orchestration
2//!
3//! # Query Execution Pipeline
4//!
5//! The executor processes queries via `CodeGraph` in three stages:
6//!
7//! 1. **Parse with cache** (`parse_query_ast`):
8//! - Check AST parse cache for `Arc<ParsedQuery>` (15-20ns cache hit)
9//! - On miss: parse query (1.6µs) and cache `Arc<ParsedQuery>`
10//! - Returns `Arc<ParsedQuery>` for zero-copy sharing
11//!
12//! 2. **Graph evaluation** (`execute_on_graph`):
13//! - Load `CodeGraph` from `.sqry/graph/snapshot.sqry`
14//! - Evaluate predicates directly on graph nodes
15//! - Handle relation queries (callers, callees, etc.) via graph edges
16//!
17//! 3. **Results**:
18//! - Return `QueryResults` with Arc-based accessors
19//! - Results sorted by file location
20//!
21//! # Arc-based Parse Cache
22//!
23//! The AST parse cache stores `Arc<ParsedQuery>` to enable zero-copy cache hits.
24//! This provides:
25//!
26//! - **100× speedup** for cache hits (1.6µs → 16ns)
27//! - **Thread-safe sharing** of parsed queries
28//! - **Memory efficiency** (one `ParsedQuery` instance per unique query string)
29//!
30//! See [`AstParseCache`](crate::query::cache::AstParseCache) for benchmarks and details.
31
32use super::graph_eval;
33use crate::graph::unified::concurrent::CodeGraph;
34use crate::graph::unified::persistence::load_from_path;
35use crate::normalizer::MetadataNormalizer;
36use crate::plugin::PluginManager;
37use crate::query::cache::{CacheStats, ResultCache};
38use crate::query::pipeline::AggregationResult;
39use crate::query::plan::{CacheStatus, ExecutionStep, QueryPlan};
40use crate::query::results::{JoinResults, QueryOutput, QueryResults};
41use crate::query::types::{Expr, PipelineStage};
42use anyhow::{Result, anyhow};
43use parking_lot::RwLock;
44use std::collections::HashMap;
45use std::path::{Path, PathBuf};
46use std::sync::Arc;
47use std::time::Instant;
48
49/// Thread-safe cache for a loaded code graph keyed by its canonical path.
50///
51/// The Option is None when no graph has been loaded yet. Once loaded, it stores
52/// both the path the graph was loaded from and the graph itself.
53pub(crate) type GraphCache = Arc<RwLock<Option<(PathBuf, Arc<CodeGraph>)>>>;
54
55/// Executes queries against code using `CodeGraph`.
56///
57/// The executor loads a `CodeGraph` from disk and evaluates queries directly
58/// against graph nodes using `execute_on_graph()`.
59pub struct QueryExecutor {
60 pub(crate) plugin_manager: PluginManager,
61
62 /// Thread-safe cache for loaded `CodeGraph`
63 ///
64 /// See [`GraphCache`] type alias for details on the caching strategy.
65 pub(crate) graph_cache: GraphCache,
66
67 /// AST parse cache (query string → `ParsedQuery`) - Boolean parser
68 pub(crate) ast_parse_cache: Arc<crate::query::cache::AstParseCache>,
69
70 /// Result cache for query results
71 pub(crate) result_cache: Arc<ResultCache>,
72
73 /// Disable parallel query execution (for A/B performance testing)
74 pub(crate) disable_parallel: bool,
75
76 /// Validation options for query parsing
77 pub(crate) validation_options: crate::query::validator::ValidationOptions,
78
79 /// Pre-flight cost-gate configuration (per `B_cost_gate.md` §2 +
80 /// `00_contracts.md` §3.CC-2 + §3.CC-3). The gate fires inside
81 /// [`Self::execute_evaluate_with`] before `evaluate_all` so a
82 /// rejected query never enters `spawn_blocking`. Default is the
83 /// documented standalone-MCP / daemon-default config (per
84 /// `00_contracts.md` §3.CC-3 the two are byte-identical on a
85 /// fresh install). Daemon callers override via
86 /// [`Self::with_cost_gate_config`] from the per-workspace
87 /// `DaemonConfig`.
88 pub(crate) cost_gate_config: crate::query::cost_gate::CostGateConfig,
89}
90
91impl QueryExecutor {
92 /// Create a new query executor
93 #[must_use]
94 pub fn new() -> Self {
95 Self {
96 plugin_manager: PluginManager::new(),
97 graph_cache: Arc::new(RwLock::new(None)),
98 ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
99 result_cache: Arc::new(ResultCache::new(1000)),
100 disable_parallel: false,
101 validation_options: crate::query::validator::ValidationOptions::default(),
102 cost_gate_config: crate::query::cost_gate::CostGateConfig::default(),
103 }
104 }
105
106 /// Create a query executor with a custom plugin manager
107 #[must_use]
108 pub fn with_plugin_manager(plugin_manager: PluginManager) -> Self {
109 Self {
110 plugin_manager,
111 graph_cache: Arc::new(RwLock::new(None)),
112 ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
113 result_cache: Arc::new(ResultCache::new(1000)),
114 disable_parallel: false,
115 validation_options: crate::query::validator::ValidationOptions::default(),
116 cost_gate_config: crate::query::cost_gate::CostGateConfig::default(),
117 }
118 }
119
120 /// Return the plugin manager used by this executor.
121 #[must_use]
122 pub fn plugin_manager(&self) -> &PluginManager {
123 &self.plugin_manager
124 }
125
126 fn build_registry(&self) -> crate::query::registry::FieldRegistry {
127 let mut registry = crate::query::registry::FieldRegistry::with_core_fields();
128 for plugin in self.plugin_manager.plugins() {
129 let _collisions = registry.add_plugin_fields(plugin.fields());
130 }
131
132 let normalizer = MetadataNormalizer::new();
133 for (short_form, canonical) in normalizer.mappings() {
134 if registry.contains(canonical)
135 && let Some(canonical_field) = registry.get(canonical)
136 {
137 let short_field = crate::query::types::FieldDescriptor {
138 name: short_form,
139 field_type: canonical_field.field_type.clone(),
140 operators: canonical_field.operators,
141 indexed: canonical_field.indexed,
142 doc: canonical_field.doc,
143 };
144 registry.add_field(short_field);
145 }
146 }
147
148 registry
149 }
150
151 /// Configure validation options (e.g., fuzzy field tolerance)
152 #[must_use]
153 pub fn with_validation_options(
154 mut self,
155 options: crate::query::validator::ValidationOptions,
156 ) -> Self {
157 self.validation_options = options;
158 self
159 }
160
161 /// Override the pre-flight cost-gate configuration (per
162 /// `B_cost_gate.md` §B6 + `00_contracts.md` §3.CC-3).
163 /// The daemon installs the per-workspace cap derived from
164 /// `DaemonConfig` here; standalone callers can lower the cap
165 /// for stricter rejection or `None` to disable arena-size
166 /// gating entirely (the shape-only checks still run).
167 #[must_use]
168 pub fn with_cost_gate_config(
169 mut self,
170 config: crate::query::cost_gate::CostGateConfig,
171 ) -> Self {
172 self.cost_gate_config = config;
173 self
174 }
175
176 /// Disable parallel query execution (for A/B performance testing)
177 #[must_use]
178 pub fn without_parallel(mut self) -> Self {
179 self.disable_parallel = true;
180 self
181 }
182
183 /// Get or load `CodeGraph` with thread-safe caching
184 ///
185 /// Uses double-checked locking pattern for thread-safe lazy initialization:
186 /// 1. Try cache with read lock - fast path (validates path matches)
187 /// 2. Load from disk with write lock - slow path with double-check
188 ///
189 /// # Path Tracking
190 /// - Cache stores (`PathBuf`, `Arc<CodeGraph>`) to track which directory was loaded
191 /// - If cached path != requested path, cache is invalidated and reloaded
192 ///
193 /// # Errors
194 ///
195 /// Returns an error if loading the graph from disk fails.
196 pub(crate) fn get_or_load_graph(&self, dir: &Path) -> Result<Option<Arc<CodeGraph>>> {
197 // Canonicalize the path for consistent comparisons
198 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
199
200 // Fast path: Try cache (read lock - allows concurrent reads)
201 {
202 let cache = self.graph_cache.read();
203 if let Some((cached_path, graph)) = cache.as_ref()
204 && cached_path == &canonical_dir
205 {
206 return Ok(Some(Arc::clone(graph)));
207 }
208 // Path mismatch - cache will be invalidated in slow path
209 }
210
211 // Slow path: Load and cache (write lock - exclusive access)
212 let mut cache = self.graph_cache.write();
213
214 // Double-check: another thread might have loaded while we waited for write lock
215 if let Some((cached_path, graph)) = cache.as_ref() {
216 if cached_path == &canonical_dir {
217 return Ok(Some(Arc::clone(graph)));
218 }
219 // Path mismatch - invalidate cache and reload below
220 log::debug!(
221 "Graph cache invalidated due to path mismatch. Old: {}, New: {}",
222 cached_path.display(),
223 canonical_dir.display()
224 );
225 }
226
227 // Actually load from disk
228 let storage = crate::graph::unified::persistence::GraphStorage::new(&canonical_dir);
229
230 if !storage.exists() {
231 // No manifest → no complete index
232 let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
233 if auto_index_var == "false" || auto_index_var == "0" {
234 *cache = None;
235 return Ok(None);
236 }
237
238 log::info!(
239 "No graph found at {}, auto-building index",
240 canonical_dir.display()
241 );
242 // Release write lock before the heavy build operation
243 drop(cache);
244
245 let config = crate::graph::unified::build::BuildConfig::default();
246 let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
247 &canonical_dir,
248 &self.plugin_manager,
249 &config,
250 "cli:auto_index",
251 )?;
252 let arc_graph = Arc::new(graph);
253
254 let mut cache = self.graph_cache.write();
255 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
256 return Ok(Some(arc_graph));
257 }
258
259 // Manifest exists → try loading snapshot
260 log::debug!(
261 "Loading CodeGraph from: {}",
262 storage.snapshot_path().display()
263 );
264
265 match load_from_path(storage.snapshot_path(), Some(&self.plugin_manager)) {
266 Ok(graph) => {
267 let arc_graph = Arc::new(graph);
268 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
269 Ok(Some(arc_graph))
270 }
271 Err(e) => {
272 // Load failed (snapshot missing/corrupt) → auto-rebuild if enabled
273 let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
274 if auto_index_var == "false" || auto_index_var == "0" {
275 return Err(e.into());
276 }
277 log::warn!("Graph load failed ({e}), auto-rebuilding index");
278 // Release write lock before the heavy rebuild
279 drop(cache);
280
281 let config = crate::graph::unified::build::BuildConfig::default();
282 let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
283 &canonical_dir,
284 &self.plugin_manager,
285 &config,
286 "cli:auto_index",
287 )?;
288 let arc_graph = Arc::new(graph);
289
290 let mut cache = self.graph_cache.write();
291 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
292 Ok(Some(arc_graph))
293 }
294 }
295 }
296
297 /// Get cache statistics (for monitoring/debugging)
298 #[must_use]
299 pub fn cache_stats(&self) -> (CacheStats, CacheStats) {
300 (self.ast_parse_cache.stats(), self.result_cache.stats())
301 }
302
303 /// Get query execution plan for --explain
304 ///
305 /// Parses the query and returns detailed execution plan with timing,
306 /// cache status, and optimization information.
307 ///
308 /// # Errors
309 ///
310 /// Returns [`anyhow::Error`] when query parsing, validation, or optimization fails.
311 pub fn get_query_plan(&self, query_str: &str) -> Result<QueryPlan> {
312 let start = Instant::now();
313
314 // Step 1: Parse query (boolean AST)
315 let parse_start = Instant::now();
316
317 let parsed = self.parse_query_ast(query_str)?;
318 let registry = self.build_registry();
319 let optimizer = crate::query::optimizer::Optimizer::new(registry);
320 let optimized_query = optimizer.optimize_query((*parsed.ast).clone());
321
322 let optimized_query_str = format!("{:?}", optimized_query.root);
323 let parse_step_name = "Parse query (boolean)";
324 let steps_prefix = vec![
325 (parse_step_name, 0),
326 ("Validate fields", 0),
327 ("Optimize AST", 0),
328 ];
329
330 // Duration beyond u64::MAX ms (~584 million years) is impossible; clamp to max
331 let parse_time = parse_start
332 .elapsed()
333 .as_millis()
334 .try_into()
335 .unwrap_or(u64::MAX);
336
337 // Get cache status
338 let (parse_stats, result_stats) = self.cache_stats();
339 let cache_status = CacheStatus {
340 parse_cache_hit: parse_stats.hits > 0,
341 result_cache_hit: result_stats.hits > 0,
342 };
343
344 // Build execution steps
345 let mut steps = Vec::new();
346 let mut step_num = 1;
347
348 for (operation, result_count) in steps_prefix {
349 steps.push(ExecutionStep {
350 step_num,
351 operation: operation.to_string(),
352 result_count,
353 time_ms: if step_num == 1 { parse_time } else { 0 },
354 });
355 step_num += 1;
356 }
357
358 // Add graph lookup step
359 steps.push(ExecutionStep {
360 step_num,
361 operation: "CodeGraph lookup".to_string(),
362 result_count: 0,
363 time_ms: 0,
364 });
365
366 // Duration beyond u64::MAX ms is impossible; clamp to max
367 let total_time = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
368
369 Ok(QueryPlan::new(
370 query_str.to_string(),
371 optimized_query_str,
372 steps,
373 total_time,
374 true, // Always uses CodeGraph
375 cache_status,
376 ))
377 }
378
379 /// Clear all caches (for testing)
380 #[cfg(test)]
381 pub fn clear_caches(&self) {
382 self.ast_parse_cache.clear();
383 self.result_cache.clear();
384 }
385
386 /// Parse query string using boolean AST parser with caching
387 ///
388 /// This method:
389 /// 1. Checks AST parse cache for existing `ParsedQuery`
390 /// 2. On cache miss: parses query, validates, extracts repo filter, normalizes
391 /// 3. Caches the `ParsedQuery` for future reuse
392 ///
393 /// # Arguments
394 ///
395 /// * `query_str` - Query string to parse (boolean syntax)
396 ///
397 /// # Returns
398 ///
399 /// * `Ok(Arc<ParsedQuery>)` - Cached or freshly parsed query
400 /// * `Err(...)` - Parse error, validation error, or repo filter error
401 ///
402 /// # Performance
403 ///
404 /// - Cache hit: ~15-20ns (Arc clone + hash lookup)
405 /// - Cache miss: ~1.6µs (lex + parse + validate + normalize)
406 ///
407 /// # Example
408 ///
409 /// ```ignore
410 /// let executor = QueryExecutor::new();
411 /// let parsed = executor.parse_query_ast("kind:function AND name:test")?;
412 /// // parsed.ast contains the boolean expression
413 /// // parsed.repo_filter contains any repo: predicates
414 /// // parsed.normalized is the cache key (repo predicates stripped)
415 /// ```
416 ///
417 /// # Errors
418 ///
419 /// Returns [`anyhow::Error`] when parsing, validation, or normalization fails.
420 pub fn parse_query_ast(&self, query_str: &str) -> Result<Arc<crate::query::ParsedQuery>> {
421 // Try cache first
422 if let Some(cached_parsed) = self.ast_parse_cache.get(query_str) {
423 log::trace!("AST parse cache HIT for: {query_str}");
424 return Ok(cached_parsed);
425 }
426
427 log::trace!("AST parse cache MISS, parsing: {query_str}");
428
429 // Parse query using boolean parser
430 let ast = crate::query::parser_new::Parser::parse_query(query_str)
431 .map_err(|err| err.with_source(query_str))?;
432
433 let registry = self.build_registry();
434
435 // Validate AST against registry (with normalization)
436 let validator =
437 crate::query::validator::Validator::with_options(registry, self.validation_options);
438 let mut normalized_ast = ast.clone();
439 normalized_ast.root = match validator.normalize_expr(&ast.root) {
440 Ok(root) => root,
441 Err(validation_err) => {
442 // Wrap normalization error with source context for rich diagnostics
443 let query_error = crate::query::error::QueryError::Validation(validation_err);
444 return Err(query_error.with_source(query_str).into());
445 }
446 };
447 if let Err(validation_err) = validator.validate(&normalized_ast.root) {
448 // Wrap validation error with source context for rich diagnostics
449 let query_error = crate::query::error::QueryError::Validation(validation_err);
450 return Err(query_error.with_source(query_str).into());
451 }
452
453 // Create ParsedQuery (extracts repo filter, normalizes AST)
454 let parsed = crate::query::ParsedQuery::from_ast(Arc::new(normalized_ast))?;
455
456 // Cache the ParsedQuery for future reuse
457 let arc_parsed = Arc::new(parsed);
458 self.ast_parse_cache
459 .insert_arc(query_str.to_string(), Arc::clone(&arc_parsed));
460
461 Ok(arc_parsed)
462 }
463}
464
465impl Default for QueryExecutor {
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471impl QueryExecutor {
472 /// Execute query using `CodeGraph`.
473 ///
474 /// Loads the `CodeGraph` from disk (or cache) and evaluates the query
475 /// directly against graph nodes.
476 ///
477 /// # Arguments
478 ///
479 /// * `query` - The query string to parse and execute
480 /// * `path` - Directory path containing the `.sqry/graph/snapshot.sqry` file
481 ///
482 /// # Returns
483 ///
484 /// `QueryResults` containing matched `NodeId`s with Arc-based accessors.
485 ///
486 /// # Errors
487 ///
488 /// Returns an error if:
489 /// - No graph exists at the path (run `sqry index` first)
490 /// - Query parsing fails
491 /// - Predicate evaluation fails
492 ///
493 /// # Example
494 ///
495 /// ```ignore
496 /// let executor = QueryExecutor::new();
497 /// let results = executor.execute_on_graph("kind:function", Path::new("/my/project"))?;
498 /// for m in results.iter() {
499 /// println!("{}: {}", m.kind().as_str(), m.name().unwrap_or_default());
500 /// }
501 /// ```
502 pub fn execute_on_graph(&self, query: &str, path: &Path) -> Result<QueryResults> {
503 self.execute_on_graph_with_variables(query, path, None)
504 }
505
506 /// Cancellable variant of [`Self::execute_on_graph`].
507 ///
508 /// The supplied `cancel` token is polled by the inner evaluator
509 /// (per-batch in `evaluate_all`, per-iteration in the rayon path
510 /// — see `A_cancellation.md` §3 + `00_contracts.md` §3.CC-1). The
511 /// non-cancellable variant is preserved for LSP/CLI call sites
512 /// that have no per-request token to plumb.
513 ///
514 /// # Errors
515 ///
516 /// As [`Self::execute_on_graph`], plus [`crate::query::error::QueryError::Cancelled`]
517 /// (wrapped in `anyhow::Error`) when the token is observed cancelled
518 /// before the evaluator returns its match list.
519 pub fn execute_on_graph_cancellable(
520 &self,
521 query: &str,
522 path: &Path,
523 cancel: &crate::query::cancellation::CancellationToken,
524 ) -> Result<QueryResults> {
525 self.execute_on_graph_with_variables_cancellable(query, path, None, cancel)
526 }
527
528 /// Execute query with variable substitution.
529 ///
530 /// Variables in the query (e.g., `$type`) are replaced with values from
531 /// the provided map before evaluation.
532 ///
533 /// # Errors
534 ///
535 /// Returns an error if graph loading, query parsing, variable resolution,
536 /// or predicate evaluation fails.
537 pub fn execute_on_graph_with_variables(
538 &self,
539 query: &str,
540 path: &Path,
541 variables: Option<&HashMap<String, String>>,
542 ) -> Result<QueryResults> {
543 // Backward-compatible: callers that don't supply a token get a
544 // never-cancelled one. Cost is one Arc<AtomicBool> alloc per
545 // call, O(1) — see `A_cancellation.md` §3.
546 self.execute_on_graph_with_variables_cancellable(
547 query,
548 path,
549 variables,
550 &crate::query::cancellation::CancellationToken::new(),
551 )
552 }
553
554 /// Cancellable variant of [`Self::execute_on_graph_with_variables`].
555 /// See [`Self::execute_on_graph_cancellable`] for the cancellation
556 /// contract.
557 ///
558 /// # Errors
559 ///
560 /// As [`Self::execute_on_graph_with_variables`], plus
561 /// [`crate::query::error::QueryError::Cancelled`] (wrapped in
562 /// `anyhow::Error`).
563 pub fn execute_on_graph_with_variables_cancellable(
564 &self,
565 query: &str,
566 path: &Path,
567 variables: Option<&HashMap<String, String>>,
568 cancel: &crate::query::cancellation::CancellationToken,
569 ) -> Result<QueryResults> {
570 let budget = crate::query::budget::QueryBudget::unbounded(cancel.clone());
571 let parsed = self.parse_query_ast(query)?;
572 let graph = self
573 .get_or_load_graph(path)?
574 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
575
576 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
577
578 self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, &budget)
579 }
580
581 /// Cancellable + budgeted variant of
582 /// [`Self::execute_on_graph_with_variables`]. See
583 /// [`Self::execute_on_preloaded_graph_with_budget`] for the
584 /// budget contract.
585 ///
586 /// # Errors
587 ///
588 /// As [`Self::execute_on_graph_with_variables`], plus
589 /// [`crate::query::budget::BudgetExceeded`] when the row
590 /// budget is exceeded.
591 pub fn execute_on_graph_with_variables_with_budget(
592 &self,
593 query: &str,
594 path: &Path,
595 variables: Option<&HashMap<String, String>>,
596 budget: &crate::query::budget::QueryBudget,
597 ) -> Result<QueryResults> {
598 let parsed = self.parse_query_ast(query)?;
599 let graph = self
600 .get_or_load_graph(path)?
601 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
602
603 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
604
605 self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, budget)
606 }
607
608 /// Execute a query against a caller-supplied `CodeGraph`, bypassing the
609 /// graph cache and on-disk loading entirely.
610 ///
611 /// This is the daemon hot-path: the caller already holds the
612 /// workspace-loaded `Arc<CodeGraph>` under a workspace lock, and we must
613 /// NOT re-enter [`Self::get_or_load_graph`] (which would hit disk and
614 /// pollute the executor's process-wide `graph_cache`). Query parsing still
615 /// goes through the AST parse cache; only the graph acquisition path is
616 /// skipped.
617 ///
618 /// The caller-supplied `workspace_root` is canonicalized by this method
619 /// (mirroring [`Self::execute_on_graph_with_variables`]); the caller need
620 /// not pre-canonicalize it. Passing the workspace root is the caller's
621 /// responsibility — it is not derived from the graph.
622 ///
623 /// # Errors
624 ///
625 /// Returns an error if query parsing, variable resolution, or predicate
626 /// evaluation fails. Unlike [`Self::execute_on_graph_with_variables`],
627 /// this method cannot produce a "no graph found" error — the graph is
628 /// always supplied by the caller.
629 pub fn execute_on_preloaded_graph(
630 &self,
631 graph: Arc<CodeGraph>,
632 query: &str,
633 workspace_root: &Path,
634 variables: Option<&HashMap<String, String>>,
635 ) -> Result<QueryResults> {
636 // Backward-compatible: callers that don't supply a token get a
637 // never-cancelled one. Cost is one Arc<AtomicBool> alloc per
638 // call, O(1) — see `A_cancellation.md` §3.
639 self.execute_on_preloaded_graph_cancellable(
640 graph,
641 query,
642 workspace_root,
643 variables,
644 &crate::query::cancellation::CancellationToken::new(),
645 )
646 }
647
648 /// Cancellable variant of [`Self::execute_on_preloaded_graph`].
649 /// See [`Self::execute_on_graph_cancellable`] for the cancellation
650 /// contract; this is the daemon hot-path entrypoint.
651 ///
652 /// # Errors
653 ///
654 /// As [`Self::execute_on_preloaded_graph`], plus
655 /// [`crate::query::error::QueryError::Cancelled`] (wrapped in
656 /// `anyhow::Error`).
657 pub fn execute_on_preloaded_graph_cancellable(
658 &self,
659 graph: Arc<CodeGraph>,
660 query: &str,
661 workspace_root: &Path,
662 variables: Option<&HashMap<String, String>>,
663 cancel: &crate::query::cancellation::CancellationToken,
664 ) -> Result<QueryResults> {
665 // The cancellable variant uses an unbounded budget — only
666 // the cooperative cancellation token is in play. Per-tool
667 // budget callers should use
668 // [`Self::execute_on_preloaded_graph_with_budget`] instead.
669 let budget = crate::query::budget::QueryBudget::unbounded(cancel.clone());
670 let parsed = self.parse_query_ast(query)?;
671 let workspace_root = workspace_root
672 .canonicalize()
673 .unwrap_or_else(|_| workspace_root.to_path_buf());
674
675 self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, &budget)
676 }
677
678 /// Cancellable + budgeted variant of
679 /// [`Self::execute_on_preloaded_graph`]. Plumbs both the
680 /// per-request cancellation token AND the per-tool runtime row
681 /// budget through to the evaluator so the budget's `tick()`
682 /// hook fires inside `evaluate_all` (per `C_budget.md` §3 +
683 /// `00_contracts.md` §3.CC-2).
684 ///
685 /// `budget.cancel` MUST be the same token the wrapper holds for
686 /// deadline-driven cancellation — `QueryBudget` enforces this
687 /// at construction time via [`crate::query::budget::QueryBudget::new`].
688 ///
689 /// # Errors
690 ///
691 /// As [`Self::execute_on_preloaded_graph`], plus
692 /// [`crate::query::budget::BudgetExceeded`] (wrapped in
693 /// `anyhow::Error`) when `budget.examined` reaches `budget.max_rows`.
694 pub fn execute_on_preloaded_graph_with_budget(
695 &self,
696 graph: Arc<CodeGraph>,
697 query: &str,
698 workspace_root: &Path,
699 variables: Option<&HashMap<String, String>>,
700 budget: &crate::query::budget::QueryBudget,
701 ) -> Result<QueryResults> {
702 let parsed = self.parse_query_ast(query)?;
703 let workspace_root = workspace_root
704 .canonicalize()
705 .unwrap_or_else(|_| workspace_root.to_path_buf());
706
707 self.execute_evaluate_with(graph, &parsed, &workspace_root, variables, budget)
708 }
709
710 /// Shared evaluation body for `execute_on_graph_with_variables` and
711 /// `execute_on_preloaded_graph` (and their cancellable overloads).
712 ///
713 /// All public entrypoints differ only in how they obtain the
714 /// `Arc<CodeGraph>`; everything from variable resolution through
715 /// `evaluate_all` + result assembly is identical and lives here to
716 /// guarantee matching semantics.
717 ///
718 /// The `cancel` token is threaded into [`graph_eval::GraphEvalContext`]
719 /// so the inner [`graph_eval::evaluate_all`] hot loop polls it
720 /// cooperatively (per `A_cancellation.md` §3 + `00_contracts.md`
721 /// §3.CC-1). Non-cancellable callers pass a freshly-allocated
722 /// never-cancelled token; the inner loop's `is_cancelled()` poll
723 /// is one cache-warm atomic load per batch and never observes a
724 /// flipped flag in that case.
725 fn execute_evaluate_with(
726 &self,
727 graph: Arc<CodeGraph>,
728 parsed: &crate::query::ParsedQuery,
729 workspace_root: &Path,
730 variables: Option<&HashMap<String, String>>,
731 budget: &crate::query::budget::QueryBudget,
732 ) -> Result<QueryResults> {
733 // Resolve variables if provided. The cost gate inspects the
734 // post-substitution AST shape so a `$var` resolved to
735 // `.*foo.*` is classified as the literal regex, not as a
736 // `Variable` (per `B_cost_gate.md` §2 "Designed shared body").
737 let effective_root = if let Some(vars) = variables {
738 crate::query::types::resolve_variables(&parsed.ast.root, vars)
739 .map_err(|e| anyhow!("Variable resolution error: {e}"))?
740 } else {
741 parsed.ast.root.clone()
742 };
743
744 // Pre-flight cost gate (P0-1 mitigation). Single-point
745 // insertion at the shared body covers all 28 callers across
746 // `execute_on_preloaded_graph` (17), `execute_on_graph` (7),
747 // and `execute_on_graph_with_variables` (4) by construction
748 // — including the previously-missed MCP `hierarchical_search`
749 // path. Runs BEFORE `GraphEvalContext::new` / `evaluate_all`
750 // so the executor never starts a rayon scan on a rejected
751 // query, freeing the spawn_blocking pool from broad-shape
752 // load. See `B_cost_gate.md` §2 + `00_contracts.md` §3.CC-2.
753 crate::query::cost_gate::check_query(
754 &effective_root,
755 graph.node_count(),
756 &self.cost_gate_config,
757 )
758 .map_err(anyhow::Error::from)?;
759
760 // Threading both cancel + budget into the evaluator: the
761 // budget owns the cancellation token internally (per
762 // `C_budget.md` §3 + `00_contracts.md` §3.CC-1), so
763 // `with_budget` installs both atomically.
764 let mut ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
765 .with_workspace_root(workspace_root)
766 .with_parallel_disabled(self.disable_parallel)
767 .with_budget(budget.clone());
768
769 let matches = graph_eval::evaluate_all(&mut ctx, &effective_root)?;
770 let mut results =
771 QueryResults::new(graph, matches).with_workspace_root(workspace_root.to_path_buf());
772 results.sort_by_location();
773 Ok(results)
774 }
775
776 /// Execute a join query, returning matched node pairs.
777 ///
778 /// The query must have a `Join` expression at its root level.
779 ///
780 /// # Errors
781 ///
782 /// Returns an error if graph loading, query parsing, or join evaluation fails.
783 pub fn execute_join(
784 &self,
785 query: &str,
786 path: &Path,
787 variables: Option<&HashMap<String, String>>,
788 ) -> Result<JoinResults> {
789 let parsed = self.parse_query_ast(query)?;
790 let graph = self
791 .get_or_load_graph(path)?
792 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
793
794 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
795
796 // Resolve variables if provided
797 let effective_root = if let Some(vars) = variables {
798 crate::query::types::resolve_variables(&parsed.ast.root, vars)
799 .map_err(|e| anyhow!("Variable resolution error: {e}"))?
800 } else {
801 parsed.ast.root.clone()
802 };
803
804 let Expr::Join(join) = &effective_root else {
805 return Err(anyhow!(
806 "Expected a join expression (e.g., `(kind:function) CALLS (kind:function)`)"
807 ));
808 };
809
810 let ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
811 .with_workspace_root(&workspace_root)
812 .with_parallel_disabled(self.disable_parallel);
813
814 let eval_result = graph_eval::evaluate_join(&ctx, join, None)?;
815 let results = JoinResults::new(
816 graph,
817 eval_result.pairs,
818 join.edge.clone(),
819 eval_result.truncated,
820 )
821 .with_workspace_root(workspace_root);
822 Ok(results)
823 }
824
825 /// Execute a pipeline query (base query + aggregation stages).
826 ///
827 /// # Errors
828 ///
829 /// Returns an error if graph loading, query parsing, or pipeline execution fails.
830 pub fn execute_pipeline(
831 &self,
832 query: &str,
833 stages: &[PipelineStage],
834 path: &Path,
835 variables: Option<&HashMap<String, String>>,
836 ) -> Result<Vec<AggregationResult>> {
837 let results = self.execute_on_graph_with_variables(query, path, variables)?;
838
839 let mut aggregations = Vec::new();
840 for stage in stages {
841 aggregations.push(super::pipeline::execute_pipeline_stage(&results, stage));
842 }
843 Ok(aggregations)
844 }
845
846 /// Execute a full query that may be a regular query, join, or pipeline.
847 ///
848 /// Detects the query type and dispatches to the appropriate executor.
849 ///
850 /// # Errors
851 ///
852 /// Returns an error if graph loading, query parsing, or execution fails.
853 pub fn execute_full(
854 &self,
855 query: &str,
856 path: &Path,
857 variables: Option<&HashMap<String, String>>,
858 ) -> Result<QueryOutput> {
859 let parsed = self.parse_query_ast(query)?;
860
861 // Check for join at the root level
862 if matches!(&parsed.ast.root, Expr::Join(_)) {
863 let join_results = self.execute_join(query, path, variables)?;
864 return Ok(QueryOutput::Join(join_results));
865 }
866
867 // Check for pipeline
868 if let Some(pipeline) = crate::query::parser_new::Parser::parse_pipeline_query(query)
869 .map_err(|err| err.with_source(query))?
870 {
871 let aggregations = self.execute_pipeline(query, &pipeline.stages, path, variables)?;
872 // Return the last aggregation result (chained stages reduce to final result)
873 if let Some(last) = aggregations.into_iter().last() {
874 return Ok(QueryOutput::Aggregation(last));
875 }
876 }
877
878 // Regular query
879 let results = self.execute_on_graph_with_variables(query, path, variables)?;
880 Ok(QueryOutput::Results(results))
881 }
882}