sqry_core/query/executor/
core.rs1use super::graph_eval;
33use crate::graph::unified::concurrent::CodeGraph;
34use crate::graph::unified::persistence::load_from_path;
35use crate::normalizer::MetadataNormalizer;
36use crate::plugin::PluginManager;
37use crate::query::cache::{CacheStats, ResultCache};
38use crate::query::pipeline::AggregationResult;
39use crate::query::plan::{CacheStatus, ExecutionStep, QueryPlan};
40use crate::query::results::{JoinResults, QueryOutput, QueryResults};
41use crate::query::types::{Expr, PipelineStage};
42use anyhow::{Result, anyhow};
43use parking_lot::RwLock;
44use std::collections::HashMap;
45use std::path::{Path, PathBuf};
46use std::sync::Arc;
47use std::time::Instant;
48
49pub(crate) type GraphCache = Arc<RwLock<Option<(PathBuf, Arc<CodeGraph>)>>>;
54
55pub struct QueryExecutor {
60 pub(crate) plugin_manager: PluginManager,
61
62 pub(crate) graph_cache: GraphCache,
66
67 pub(crate) ast_parse_cache: Arc<crate::query::cache::AstParseCache>,
69
70 pub(crate) result_cache: Arc<ResultCache>,
72
73 pub(crate) disable_parallel: bool,
75
76 pub(crate) validation_options: crate::query::validator::ValidationOptions,
78}
79
80impl QueryExecutor {
81 #[must_use]
83 pub fn new() -> Self {
84 Self {
85 plugin_manager: PluginManager::new(),
86 graph_cache: Arc::new(RwLock::new(None)),
87 ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
88 result_cache: Arc::new(ResultCache::new(1000)),
89 disable_parallel: false,
90 validation_options: crate::query::validator::ValidationOptions::default(),
91 }
92 }
93
94 #[must_use]
96 pub fn with_plugin_manager(plugin_manager: PluginManager) -> Self {
97 Self {
98 plugin_manager,
99 graph_cache: Arc::new(RwLock::new(None)),
100 ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
101 result_cache: Arc::new(ResultCache::new(1000)),
102 disable_parallel: false,
103 validation_options: crate::query::validator::ValidationOptions::default(),
104 }
105 }
106
107 #[must_use]
109 pub fn plugin_manager(&self) -> &PluginManager {
110 &self.plugin_manager
111 }
112
113 fn build_registry(&self) -> crate::query::registry::FieldRegistry {
114 let mut registry = crate::query::registry::FieldRegistry::with_core_fields();
115 for plugin in self.plugin_manager.plugins() {
116 let _collisions = registry.add_plugin_fields(plugin.fields());
117 }
118
119 let normalizer = MetadataNormalizer::new();
120 for (short_form, canonical) in normalizer.mappings() {
121 if registry.contains(canonical)
122 && let Some(canonical_field) = registry.get(canonical)
123 {
124 let short_field = crate::query::types::FieldDescriptor {
125 name: short_form,
126 field_type: canonical_field.field_type.clone(),
127 operators: canonical_field.operators,
128 indexed: canonical_field.indexed,
129 doc: canonical_field.doc,
130 };
131 registry.add_field(short_field);
132 }
133 }
134
135 registry
136 }
137
138 #[must_use]
140 pub fn with_validation_options(
141 mut self,
142 options: crate::query::validator::ValidationOptions,
143 ) -> Self {
144 self.validation_options = options;
145 self
146 }
147
148 #[must_use]
150 pub fn without_parallel(mut self) -> Self {
151 self.disable_parallel = true;
152 self
153 }
154
155 pub(crate) fn get_or_load_graph(&self, dir: &Path) -> Result<Option<Arc<CodeGraph>>> {
169 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
171
172 {
174 let cache = self.graph_cache.read();
175 if let Some((cached_path, graph)) = cache.as_ref()
176 && cached_path == &canonical_dir
177 {
178 return Ok(Some(Arc::clone(graph)));
179 }
180 }
182
183 let mut cache = self.graph_cache.write();
185
186 if let Some((cached_path, graph)) = cache.as_ref() {
188 if cached_path == &canonical_dir {
189 return Ok(Some(Arc::clone(graph)));
190 }
191 log::debug!(
193 "Graph cache invalidated due to path mismatch. Old: {}, New: {}",
194 cached_path.display(),
195 canonical_dir.display()
196 );
197 }
198
199 let storage = crate::graph::unified::persistence::GraphStorage::new(&canonical_dir);
201
202 if !storage.exists() {
203 let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
205 if auto_index_var == "false" || auto_index_var == "0" {
206 *cache = None;
207 return Ok(None);
208 }
209
210 log::info!(
211 "No graph found at {}, auto-building index",
212 canonical_dir.display()
213 );
214 drop(cache);
216
217 let config = crate::graph::unified::build::BuildConfig::default();
218 let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
219 &canonical_dir,
220 &self.plugin_manager,
221 &config,
222 "cli:auto_index",
223 )?;
224 let arc_graph = Arc::new(graph);
225
226 let mut cache = self.graph_cache.write();
227 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
228 return Ok(Some(arc_graph));
229 }
230
231 log::debug!(
233 "Loading CodeGraph from: {}",
234 storage.snapshot_path().display()
235 );
236
237 match load_from_path(storage.snapshot_path(), Some(&self.plugin_manager)) {
238 Ok(graph) => {
239 let arc_graph = Arc::new(graph);
240 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
241 Ok(Some(arc_graph))
242 }
243 Err(e) => {
244 let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
246 if auto_index_var == "false" || auto_index_var == "0" {
247 return Err(e.into());
248 }
249 log::warn!("Graph load failed ({e}), auto-rebuilding index");
250 drop(cache);
252
253 let config = crate::graph::unified::build::BuildConfig::default();
254 let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
255 &canonical_dir,
256 &self.plugin_manager,
257 &config,
258 "cli:auto_index",
259 )?;
260 let arc_graph = Arc::new(graph);
261
262 let mut cache = self.graph_cache.write();
263 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
264 Ok(Some(arc_graph))
265 }
266 }
267 }
268
269 #[must_use]
271 pub fn cache_stats(&self) -> (CacheStats, CacheStats) {
272 (self.ast_parse_cache.stats(), self.result_cache.stats())
273 }
274
275 pub fn get_query_plan(&self, query_str: &str) -> Result<QueryPlan> {
284 let start = Instant::now();
285
286 let parse_start = Instant::now();
288
289 let parsed = self.parse_query_ast(query_str)?;
290 let registry = self.build_registry();
291 let optimizer = crate::query::optimizer::Optimizer::new(registry);
292 let optimized_query = optimizer.optimize_query((*parsed.ast).clone());
293
294 let optimized_query_str = format!("{:?}", optimized_query.root);
295 let parse_step_name = "Parse query (boolean)";
296 let steps_prefix = vec![
297 (parse_step_name, 0),
298 ("Validate fields", 0),
299 ("Optimize AST", 0),
300 ];
301
302 let parse_time = parse_start
304 .elapsed()
305 .as_millis()
306 .try_into()
307 .unwrap_or(u64::MAX);
308
309 let (parse_stats, result_stats) = self.cache_stats();
311 let cache_status = CacheStatus {
312 parse_cache_hit: parse_stats.hits > 0,
313 result_cache_hit: result_stats.hits > 0,
314 };
315
316 let mut steps = Vec::new();
318 let mut step_num = 1;
319
320 for (operation, result_count) in steps_prefix {
321 steps.push(ExecutionStep {
322 step_num,
323 operation: operation.to_string(),
324 result_count,
325 time_ms: if step_num == 1 { parse_time } else { 0 },
326 });
327 step_num += 1;
328 }
329
330 steps.push(ExecutionStep {
332 step_num,
333 operation: "CodeGraph lookup".to_string(),
334 result_count: 0,
335 time_ms: 0,
336 });
337
338 let total_time = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
340
341 Ok(QueryPlan::new(
342 query_str.to_string(),
343 optimized_query_str,
344 steps,
345 total_time,
346 true, cache_status,
348 ))
349 }
350
351 #[cfg(test)]
353 pub fn clear_caches(&self) {
354 self.ast_parse_cache.clear();
355 self.result_cache.clear();
356 }
357
358 pub fn parse_query_ast(&self, query_str: &str) -> Result<Arc<crate::query::ParsedQuery>> {
393 if let Some(cached_parsed) = self.ast_parse_cache.get(query_str) {
395 log::trace!("AST parse cache HIT for: {query_str}");
396 return Ok(cached_parsed);
397 }
398
399 log::trace!("AST parse cache MISS, parsing: {query_str}");
400
401 let ast = crate::query::parser_new::Parser::parse_query(query_str)
403 .map_err(|err| err.with_source(query_str))?;
404
405 let registry = self.build_registry();
406
407 let validator =
409 crate::query::validator::Validator::with_options(registry, self.validation_options);
410 let mut normalized_ast = ast.clone();
411 normalized_ast.root = match validator.normalize_expr(&ast.root) {
412 Ok(root) => root,
413 Err(validation_err) => {
414 let query_error = crate::query::error::QueryError::Validation(validation_err);
416 return Err(query_error.with_source(query_str).into());
417 }
418 };
419 if let Err(validation_err) = validator.validate(&normalized_ast.root) {
420 let query_error = crate::query::error::QueryError::Validation(validation_err);
422 return Err(query_error.with_source(query_str).into());
423 }
424
425 let parsed = crate::query::ParsedQuery::from_ast(Arc::new(normalized_ast))?;
427
428 let arc_parsed = Arc::new(parsed);
430 self.ast_parse_cache
431 .insert_arc(query_str.to_string(), Arc::clone(&arc_parsed));
432
433 Ok(arc_parsed)
434 }
435}
436
437impl Default for QueryExecutor {
438 fn default() -> Self {
439 Self::new()
440 }
441}
442
443impl QueryExecutor {
444 pub fn execute_on_graph(&self, query: &str, path: &Path) -> Result<QueryResults> {
475 self.execute_on_graph_with_variables(query, path, None)
476 }
477
478 pub fn execute_on_graph_with_variables(
488 &self,
489 query: &str,
490 path: &Path,
491 variables: Option<&HashMap<String, String>>,
492 ) -> Result<QueryResults> {
493 let parsed = self.parse_query_ast(query)?;
494 let graph = self
495 .get_or_load_graph(path)?
496 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
497
498 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
499
500 self.execute_evaluate_with(graph, &parsed, &workspace_root, variables)
501 }
502
503 pub fn execute_on_preloaded_graph(
525 &self,
526 graph: Arc<CodeGraph>,
527 query: &str,
528 workspace_root: &Path,
529 variables: Option<&HashMap<String, String>>,
530 ) -> Result<QueryResults> {
531 let parsed = self.parse_query_ast(query)?;
532 let workspace_root = workspace_root
533 .canonicalize()
534 .unwrap_or_else(|_| workspace_root.to_path_buf());
535
536 self.execute_evaluate_with(graph, &parsed, &workspace_root, variables)
537 }
538
539 fn execute_evaluate_with(
547 &self,
548 graph: Arc<CodeGraph>,
549 parsed: &crate::query::ParsedQuery,
550 workspace_root: &Path,
551 variables: Option<&HashMap<String, String>>,
552 ) -> Result<QueryResults> {
553 let effective_root = if let Some(vars) = variables {
555 crate::query::types::resolve_variables(&parsed.ast.root, vars)
556 .map_err(|e| anyhow!("Variable resolution error: {e}"))?
557 } else {
558 parsed.ast.root.clone()
559 };
560
561 let mut ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
562 .with_workspace_root(workspace_root)
563 .with_parallel_disabled(self.disable_parallel);
564
565 let matches = graph_eval::evaluate_all(&mut ctx, &effective_root)?;
566 let mut results =
567 QueryResults::new(graph, matches).with_workspace_root(workspace_root.to_path_buf());
568 results.sort_by_location();
569 Ok(results)
570 }
571
572 pub fn execute_join(
580 &self,
581 query: &str,
582 path: &Path,
583 variables: Option<&HashMap<String, String>>,
584 ) -> Result<JoinResults> {
585 let parsed = self.parse_query_ast(query)?;
586 let graph = self
587 .get_or_load_graph(path)?
588 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
589
590 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
591
592 let effective_root = if let Some(vars) = variables {
594 crate::query::types::resolve_variables(&parsed.ast.root, vars)
595 .map_err(|e| anyhow!("Variable resolution error: {e}"))?
596 } else {
597 parsed.ast.root.clone()
598 };
599
600 let Expr::Join(join) = &effective_root else {
601 return Err(anyhow!(
602 "Expected a join expression (e.g., `(kind:function) CALLS (kind:function)`)"
603 ));
604 };
605
606 let ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
607 .with_workspace_root(&workspace_root)
608 .with_parallel_disabled(self.disable_parallel);
609
610 let eval_result = graph_eval::evaluate_join(&ctx, join, None)?;
611 let results = JoinResults::new(
612 graph,
613 eval_result.pairs,
614 join.edge.clone(),
615 eval_result.truncated,
616 )
617 .with_workspace_root(workspace_root);
618 Ok(results)
619 }
620
621 pub fn execute_pipeline(
627 &self,
628 query: &str,
629 stages: &[PipelineStage],
630 path: &Path,
631 variables: Option<&HashMap<String, String>>,
632 ) -> Result<Vec<AggregationResult>> {
633 let results = self.execute_on_graph_with_variables(query, path, variables)?;
634
635 let mut aggregations = Vec::new();
636 for stage in stages {
637 aggregations.push(super::pipeline::execute_pipeline_stage(&results, stage));
638 }
639 Ok(aggregations)
640 }
641
642 pub fn execute_full(
650 &self,
651 query: &str,
652 path: &Path,
653 variables: Option<&HashMap<String, String>>,
654 ) -> Result<QueryOutput> {
655 let parsed = self.parse_query_ast(query)?;
656
657 if matches!(&parsed.ast.root, Expr::Join(_)) {
659 let join_results = self.execute_join(query, path, variables)?;
660 return Ok(QueryOutput::Join(join_results));
661 }
662
663 if let Some(pipeline) = crate::query::parser_new::Parser::parse_pipeline_query(query)
665 .map_err(|err| err.with_source(query))?
666 {
667 let aggregations = self.execute_pipeline(query, &pipeline.stages, path, variables)?;
668 if let Some(last) = aggregations.into_iter().last() {
670 return Ok(QueryOutput::Aggregation(last));
671 }
672 }
673
674 let results = self.execute_on_graph_with_variables(query, path, variables)?;
676 Ok(QueryOutput::Results(results))
677 }
678}