Skip to main content

sqry_core/query/executor/
core.rs

1//! Query executor core - main `QueryExecutor` struct and orchestration
2//!
3//! # Query Execution Pipeline
4//!
5//! The executor processes queries via `CodeGraph` in three stages:
6//!
7//! 1. **Parse with cache** (`parse_query_ast`):
8//!    - Check AST parse cache for `Arc<ParsedQuery>` (15-20ns cache hit)
9//!    - On miss: parse query (1.6µs) and cache `Arc<ParsedQuery>`
10//!    - Returns `Arc<ParsedQuery>` for zero-copy sharing
11//!
12//! 2. **Graph evaluation** (`execute_on_graph`):
13//!    - Load `CodeGraph` from `.sqry/graph/snapshot.sqry`
14//!    - Evaluate predicates directly on graph nodes
15//!    - Handle relation queries (callers, callees, etc.) via graph edges
16//!
17//! 3. **Results**:
18//!    - Return `QueryResults` with Arc-based accessors
19//!    - Results sorted by file location
20//!
21//! # Arc-based Parse Cache
22//!
23//! The AST parse cache stores `Arc<ParsedQuery>` to enable zero-copy cache hits.
24//! This provides:
25//!
26//! - **100× speedup** for cache hits (1.6µs → 16ns)
27//! - **Thread-safe sharing** of parsed queries
28//! - **Memory efficiency** (one `ParsedQuery` instance per unique query string)
29//!
30//! See [`AstParseCache`](crate::query::cache::AstParseCache) for benchmarks and details.
31
32use super::graph_eval;
33use crate::graph::unified::concurrent::CodeGraph;
34use crate::graph::unified::persistence::load_from_path;
35use crate::normalizer::MetadataNormalizer;
36use crate::plugin::PluginManager;
37use crate::query::cache::{CacheStats, ResultCache};
38use crate::query::pipeline::AggregationResult;
39use crate::query::plan::{CacheStatus, ExecutionStep, QueryPlan};
40use crate::query::results::{JoinResults, QueryOutput, QueryResults};
41use crate::query::types::{Expr, PipelineStage};
42use anyhow::{Result, anyhow};
43use parking_lot::RwLock;
44use std::collections::HashMap;
45use std::path::{Path, PathBuf};
46use std::sync::Arc;
47use std::time::Instant;
48
49/// Thread-safe cache for a loaded code graph keyed by its canonical path.
50///
51/// The Option is None when no graph has been loaded yet. Once loaded, it stores
52/// both the path the graph was loaded from and the graph itself.
53pub(crate) type GraphCache = Arc<RwLock<Option<(PathBuf, Arc<CodeGraph>)>>>;
54
55/// Executes queries against code using `CodeGraph`.
56///
57/// The executor loads a `CodeGraph` from disk and evaluates queries directly
58/// against graph nodes using `execute_on_graph()`.
59pub struct QueryExecutor {
60    pub(crate) plugin_manager: PluginManager,
61
62    /// Thread-safe cache for loaded `CodeGraph`
63    ///
64    /// See [`GraphCache`] type alias for details on the caching strategy.
65    pub(crate) graph_cache: GraphCache,
66
67    /// AST parse cache (query string → `ParsedQuery`) - Boolean parser
68    pub(crate) ast_parse_cache: Arc<crate::query::cache::AstParseCache>,
69
70    /// Result cache for query results
71    pub(crate) result_cache: Arc<ResultCache>,
72
73    /// Disable parallel query execution (for A/B performance testing)
74    pub(crate) disable_parallel: bool,
75
76    /// Validation options for query parsing
77    pub(crate) validation_options: crate::query::validator::ValidationOptions,
78}
79
80impl QueryExecutor {
81    /// Create a new query executor
82    #[must_use]
83    pub fn new() -> Self {
84        Self {
85            plugin_manager: PluginManager::new(),
86            graph_cache: Arc::new(RwLock::new(None)),
87            ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
88            result_cache: Arc::new(ResultCache::new(1000)),
89            disable_parallel: false,
90            validation_options: crate::query::validator::ValidationOptions::default(),
91        }
92    }
93
94    /// Create a query executor with a custom plugin manager
95    #[must_use]
96    pub fn with_plugin_manager(plugin_manager: PluginManager) -> Self {
97        Self {
98            plugin_manager,
99            graph_cache: Arc::new(RwLock::new(None)),
100            ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
101            result_cache: Arc::new(ResultCache::new(1000)),
102            disable_parallel: false,
103            validation_options: crate::query::validator::ValidationOptions::default(),
104        }
105    }
106
107    /// Return the plugin manager used by this executor.
108    #[must_use]
109    pub fn plugin_manager(&self) -> &PluginManager {
110        &self.plugin_manager
111    }
112
113    fn build_registry(&self) -> crate::query::registry::FieldRegistry {
114        let mut registry = crate::query::registry::FieldRegistry::with_core_fields();
115        for plugin in self.plugin_manager.plugins() {
116            let _collisions = registry.add_plugin_fields(plugin.fields());
117        }
118
119        let normalizer = MetadataNormalizer::new();
120        for (short_form, canonical) in normalizer.mappings() {
121            if registry.contains(canonical)
122                && let Some(canonical_field) = registry.get(canonical)
123            {
124                let short_field = crate::query::types::FieldDescriptor {
125                    name: short_form,
126                    field_type: canonical_field.field_type.clone(),
127                    operators: canonical_field.operators,
128                    indexed: canonical_field.indexed,
129                    doc: canonical_field.doc,
130                };
131                registry.add_field(short_field);
132            }
133        }
134
135        registry
136    }
137
138    /// Configure validation options (e.g., fuzzy field tolerance)
139    #[must_use]
140    pub fn with_validation_options(
141        mut self,
142        options: crate::query::validator::ValidationOptions,
143    ) -> Self {
144        self.validation_options = options;
145        self
146    }
147
148    /// Disable parallel query execution (for A/B performance testing)
149    #[must_use]
150    pub fn without_parallel(mut self) -> Self {
151        self.disable_parallel = true;
152        self
153    }
154
155    /// Get or load `CodeGraph` with thread-safe caching
156    ///
157    /// Uses double-checked locking pattern for thread-safe lazy initialization:
158    /// 1. Try cache with read lock - fast path (validates path matches)
159    /// 2. Load from disk with write lock - slow path with double-check
160    ///
161    /// # Path Tracking
162    /// - Cache stores (`PathBuf`, `Arc<CodeGraph>`) to track which directory was loaded
163    /// - If cached path != requested path, cache is invalidated and reloaded
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if loading the graph from disk fails.
168    pub(crate) fn get_or_load_graph(&self, dir: &Path) -> Result<Option<Arc<CodeGraph>>> {
169        // Canonicalize the path for consistent comparisons
170        let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
171
172        // Fast path: Try cache (read lock - allows concurrent reads)
173        {
174            let cache = self.graph_cache.read();
175            if let Some((cached_path, graph)) = cache.as_ref()
176                && cached_path == &canonical_dir
177            {
178                return Ok(Some(Arc::clone(graph)));
179            }
180            // Path mismatch - cache will be invalidated in slow path
181        }
182
183        // Slow path: Load and cache (write lock - exclusive access)
184        let mut cache = self.graph_cache.write();
185
186        // Double-check: another thread might have loaded while we waited for write lock
187        if let Some((cached_path, graph)) = cache.as_ref() {
188            if cached_path == &canonical_dir {
189                return Ok(Some(Arc::clone(graph)));
190            }
191            // Path mismatch - invalidate cache and reload below
192            log::debug!(
193                "Graph cache invalidated due to path mismatch. Old: {}, New: {}",
194                cached_path.display(),
195                canonical_dir.display()
196            );
197        }
198
199        // Actually load from disk
200        let storage = crate::graph::unified::persistence::GraphStorage::new(&canonical_dir);
201
202        if !storage.exists() {
203            // No manifest → no complete index
204            let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
205            if auto_index_var == "false" || auto_index_var == "0" {
206                *cache = None;
207                return Ok(None);
208            }
209
210            log::info!(
211                "No graph found at {}, auto-building index",
212                canonical_dir.display()
213            );
214            // Release write lock before the heavy build operation
215            drop(cache);
216
217            let config = crate::graph::unified::build::BuildConfig::default();
218            let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
219                &canonical_dir,
220                &self.plugin_manager,
221                &config,
222                "cli:auto_index",
223            )?;
224            let arc_graph = Arc::new(graph);
225
226            let mut cache = self.graph_cache.write();
227            *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
228            return Ok(Some(arc_graph));
229        }
230
231        // Manifest exists → try loading snapshot
232        log::debug!(
233            "Loading CodeGraph from: {}",
234            storage.snapshot_path().display()
235        );
236
237        match load_from_path(storage.snapshot_path(), Some(&self.plugin_manager)) {
238            Ok(graph) => {
239                let arc_graph = Arc::new(graph);
240                *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
241                Ok(Some(arc_graph))
242            }
243            Err(e) => {
244                // Load failed (snapshot missing/corrupt) → auto-rebuild if enabled
245                let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
246                if auto_index_var == "false" || auto_index_var == "0" {
247                    return Err(e.into());
248                }
249                log::warn!("Graph load failed ({e}), auto-rebuilding index");
250                // Release write lock before the heavy rebuild
251                drop(cache);
252
253                let config = crate::graph::unified::build::BuildConfig::default();
254                let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
255                    &canonical_dir,
256                    &self.plugin_manager,
257                    &config,
258                    "cli:auto_index",
259                )?;
260                let arc_graph = Arc::new(graph);
261
262                let mut cache = self.graph_cache.write();
263                *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
264                Ok(Some(arc_graph))
265            }
266        }
267    }
268
269    /// Get cache statistics (for monitoring/debugging)
270    #[must_use]
271    pub fn cache_stats(&self) -> (CacheStats, CacheStats) {
272        (self.ast_parse_cache.stats(), self.result_cache.stats())
273    }
274
275    /// Get query execution plan for --explain
276    ///
277    /// Parses the query and returns detailed execution plan with timing,
278    /// cache status, and optimization information.
279    ///
280    /// # Errors
281    ///
282    /// Returns [`anyhow::Error`] when query parsing, validation, or optimization fails.
283    pub fn get_query_plan(&self, query_str: &str) -> Result<QueryPlan> {
284        let start = Instant::now();
285
286        // Step 1: Parse query (boolean AST)
287        let parse_start = Instant::now();
288
289        let parsed = self.parse_query_ast(query_str)?;
290        let registry = self.build_registry();
291        let optimizer = crate::query::optimizer::Optimizer::new(registry);
292        let optimized_query = optimizer.optimize_query((*parsed.ast).clone());
293
294        let optimized_query_str = format!("{:?}", optimized_query.root);
295        let parse_step_name = "Parse query (boolean)";
296        let steps_prefix = vec![
297            (parse_step_name, 0),
298            ("Validate fields", 0),
299            ("Optimize AST", 0),
300        ];
301
302        // Duration beyond u64::MAX ms (~584 million years) is impossible; clamp to max
303        let parse_time = parse_start
304            .elapsed()
305            .as_millis()
306            .try_into()
307            .unwrap_or(u64::MAX);
308
309        // Get cache status
310        let (parse_stats, result_stats) = self.cache_stats();
311        let cache_status = CacheStatus {
312            parse_cache_hit: parse_stats.hits > 0,
313            result_cache_hit: result_stats.hits > 0,
314        };
315
316        // Build execution steps
317        let mut steps = Vec::new();
318        let mut step_num = 1;
319
320        for (operation, result_count) in steps_prefix {
321            steps.push(ExecutionStep {
322                step_num,
323                operation: operation.to_string(),
324                result_count,
325                time_ms: if step_num == 1 { parse_time } else { 0 },
326            });
327            step_num += 1;
328        }
329
330        // Add graph lookup step
331        steps.push(ExecutionStep {
332            step_num,
333            operation: "CodeGraph lookup".to_string(),
334            result_count: 0,
335            time_ms: 0,
336        });
337
338        // Duration beyond u64::MAX ms is impossible; clamp to max
339        let total_time = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
340
341        Ok(QueryPlan::new(
342            query_str.to_string(),
343            optimized_query_str,
344            steps,
345            total_time,
346            true, // Always uses CodeGraph
347            cache_status,
348        ))
349    }
350
351    /// Clear all caches (for testing)
352    #[cfg(test)]
353    pub fn clear_caches(&self) {
354        self.ast_parse_cache.clear();
355        self.result_cache.clear();
356    }
357
358    /// Parse query string using boolean AST parser with caching
359    ///
360    /// This method:
361    /// 1. Checks AST parse cache for existing `ParsedQuery`
362    /// 2. On cache miss: parses query, validates, extracts repo filter, normalizes
363    /// 3. Caches the `ParsedQuery` for future reuse
364    ///
365    /// # Arguments
366    ///
367    /// * `query_str` - Query string to parse (boolean syntax)
368    ///
369    /// # Returns
370    ///
371    /// * `Ok(Arc<ParsedQuery>)` - Cached or freshly parsed query
372    /// * `Err(...)` - Parse error, validation error, or repo filter error
373    ///
374    /// # Performance
375    ///
376    /// - Cache hit: ~15-20ns (Arc clone + hash lookup)
377    /// - Cache miss: ~1.6µs (lex + parse + validate + normalize)
378    ///
379    /// # Example
380    ///
381    /// ```ignore
382    /// let executor = QueryExecutor::new();
383    /// let parsed = executor.parse_query_ast("kind:function AND name:test")?;
384    /// // parsed.ast contains the boolean expression
385    /// // parsed.repo_filter contains any repo: predicates
386    /// // parsed.normalized is the cache key (repo predicates stripped)
387    /// ```
388    ///
389    /// # Errors
390    ///
391    /// Returns [`anyhow::Error`] when parsing, validation, or normalization fails.
392    pub fn parse_query_ast(&self, query_str: &str) -> Result<Arc<crate::query::ParsedQuery>> {
393        // Try cache first
394        if let Some(cached_parsed) = self.ast_parse_cache.get(query_str) {
395            log::trace!("AST parse cache HIT for: {query_str}");
396            return Ok(cached_parsed);
397        }
398
399        log::trace!("AST parse cache MISS, parsing: {query_str}");
400
401        // Parse query using boolean parser
402        let ast = crate::query::parser_new::Parser::parse_query(query_str)
403            .map_err(|err| err.with_source(query_str))?;
404
405        let registry = self.build_registry();
406
407        // Validate AST against registry (with normalization)
408        let validator =
409            crate::query::validator::Validator::with_options(registry, self.validation_options);
410        let mut normalized_ast = ast.clone();
411        normalized_ast.root = match validator.normalize_expr(&ast.root) {
412            Ok(root) => root,
413            Err(validation_err) => {
414                // Wrap normalization error with source context for rich diagnostics
415                let query_error = crate::query::error::QueryError::Validation(validation_err);
416                return Err(query_error.with_source(query_str).into());
417            }
418        };
419        if let Err(validation_err) = validator.validate(&normalized_ast.root) {
420            // Wrap validation error with source context for rich diagnostics
421            let query_error = crate::query::error::QueryError::Validation(validation_err);
422            return Err(query_error.with_source(query_str).into());
423        }
424
425        // Create ParsedQuery (extracts repo filter, normalizes AST)
426        let parsed = crate::query::ParsedQuery::from_ast(Arc::new(normalized_ast))?;
427
428        // Cache the ParsedQuery for future reuse
429        let arc_parsed = Arc::new(parsed);
430        self.ast_parse_cache
431            .insert_arc(query_str.to_string(), Arc::clone(&arc_parsed));
432
433        Ok(arc_parsed)
434    }
435}
436
437impl Default for QueryExecutor {
438    fn default() -> Self {
439        Self::new()
440    }
441}
442
443impl QueryExecutor {
444    /// Execute query using `CodeGraph`.
445    ///
446    /// Loads the `CodeGraph` from disk (or cache) and evaluates the query
447    /// directly against graph nodes.
448    ///
449    /// # Arguments
450    ///
451    /// * `query` - The query string to parse and execute
452    /// * `path` - Directory path containing the `.sqry/graph/snapshot.sqry` file
453    ///
454    /// # Returns
455    ///
456    /// `QueryResults` containing matched `NodeId`s with Arc-based accessors.
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if:
461    /// - No graph exists at the path (run `sqry index` first)
462    /// - Query parsing fails
463    /// - Predicate evaluation fails
464    ///
465    /// # Example
466    ///
467    /// ```ignore
468    /// let executor = QueryExecutor::new();
469    /// let results = executor.execute_on_graph("kind:function", Path::new("/my/project"))?;
470    /// for m in results.iter() {
471    ///     println!("{}: {}", m.kind().as_str(), m.name().unwrap_or_default());
472    /// }
473    /// ```
474    pub fn execute_on_graph(&self, query: &str, path: &Path) -> Result<QueryResults> {
475        self.execute_on_graph_with_variables(query, path, None)
476    }
477
478    /// Execute query with variable substitution.
479    ///
480    /// Variables in the query (e.g., `$type`) are replaced with values from
481    /// the provided map before evaluation.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if graph loading, query parsing, variable resolution,
486    /// or predicate evaluation fails.
487    pub fn execute_on_graph_with_variables(
488        &self,
489        query: &str,
490        path: &Path,
491        variables: Option<&HashMap<String, String>>,
492    ) -> Result<QueryResults> {
493        let parsed = self.parse_query_ast(query)?;
494        let graph = self
495            .get_or_load_graph(path)?
496            .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
497
498        let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
499
500        // Resolve variables if provided
501        let effective_root = if let Some(vars) = variables {
502            crate::query::types::resolve_variables(&parsed.ast.root, vars)
503                .map_err(|e| anyhow!("Variable resolution error: {e}"))?
504        } else {
505            parsed.ast.root.clone()
506        };
507
508        let mut ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
509            .with_workspace_root(&workspace_root)
510            .with_parallel_disabled(self.disable_parallel);
511
512        // Precompute imports once per unique imports: target before evaluation
513        for target in graph_eval::collect_import_targets(&effective_root) {
514            ctx.precompute_imports(&target);
515        }
516
517        let matches = graph_eval::evaluate_all(&mut ctx, &effective_root)?;
518        let mut results = QueryResults::new(graph, matches).with_workspace_root(workspace_root);
519        results.sort_by_location();
520        Ok(results)
521    }
522
523    /// Execute a join query, returning matched node pairs.
524    ///
525    /// The query must have a `Join` expression at its root level.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if graph loading, query parsing, or join evaluation fails.
530    pub fn execute_join(
531        &self,
532        query: &str,
533        path: &Path,
534        variables: Option<&HashMap<String, String>>,
535    ) -> Result<JoinResults> {
536        let parsed = self.parse_query_ast(query)?;
537        let graph = self
538            .get_or_load_graph(path)?
539            .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
540
541        let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
542
543        // Resolve variables if provided
544        let effective_root = if let Some(vars) = variables {
545            crate::query::types::resolve_variables(&parsed.ast.root, vars)
546                .map_err(|e| anyhow!("Variable resolution error: {e}"))?
547        } else {
548            parsed.ast.root.clone()
549        };
550
551        let Expr::Join(join) = &effective_root else {
552            return Err(anyhow!(
553                "Expected a join expression (e.g., `(kind:function) CALLS (kind:function)`)"
554            ));
555        };
556
557        let ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
558            .with_workspace_root(&workspace_root)
559            .with_parallel_disabled(self.disable_parallel);
560
561        let eval_result = graph_eval::evaluate_join(&ctx, join, None)?;
562        let results = JoinResults::new(
563            graph,
564            eval_result.pairs,
565            join.edge.clone(),
566            eval_result.truncated,
567        )
568        .with_workspace_root(workspace_root);
569        Ok(results)
570    }
571
572    /// Execute a pipeline query (base query + aggregation stages).
573    ///
574    /// # Errors
575    ///
576    /// Returns an error if graph loading, query parsing, or pipeline execution fails.
577    pub fn execute_pipeline(
578        &self,
579        query: &str,
580        stages: &[PipelineStage],
581        path: &Path,
582        variables: Option<&HashMap<String, String>>,
583    ) -> Result<Vec<AggregationResult>> {
584        let results = self.execute_on_graph_with_variables(query, path, variables)?;
585
586        let mut aggregations = Vec::new();
587        for stage in stages {
588            aggregations.push(super::pipeline::execute_pipeline_stage(&results, stage));
589        }
590        Ok(aggregations)
591    }
592
593    /// Execute a full query that may be a regular query, join, or pipeline.
594    ///
595    /// Detects the query type and dispatches to the appropriate executor.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if graph loading, query parsing, or execution fails.
600    pub fn execute_full(
601        &self,
602        query: &str,
603        path: &Path,
604        variables: Option<&HashMap<String, String>>,
605    ) -> Result<QueryOutput> {
606        let parsed = self.parse_query_ast(query)?;
607
608        // Check for join at the root level
609        if matches!(&parsed.ast.root, Expr::Join(_)) {
610            let join_results = self.execute_join(query, path, variables)?;
611            return Ok(QueryOutput::Join(join_results));
612        }
613
614        // Check for pipeline
615        if let Some(pipeline) = crate::query::parser_new::Parser::parse_pipeline_query(query)
616            .map_err(|err| err.with_source(query))?
617        {
618            let aggregations = self.execute_pipeline(query, &pipeline.stages, path, variables)?;
619            // Return the last aggregation result (chained stages reduce to final result)
620            if let Some(last) = aggregations.into_iter().last() {
621                return Ok(QueryOutput::Aggregation(last));
622            }
623        }
624
625        // Regular query
626        let results = self.execute_on_graph_with_variables(query, path, variables)?;
627        Ok(QueryOutput::Results(results))
628    }
629}