sqry_core/query/executor/
core.rs1use super::graph_eval;
33use crate::graph::unified::concurrent::CodeGraph;
34use crate::graph::unified::persistence::load_from_path;
35use crate::normalizer::MetadataNormalizer;
36use crate::plugin::PluginManager;
37use crate::query::cache::{CacheStats, ResultCache};
38use crate::query::pipeline::AggregationResult;
39use crate::query::plan::{CacheStatus, ExecutionStep, QueryPlan};
40use crate::query::results::{JoinResults, QueryOutput, QueryResults};
41use crate::query::types::{Expr, PipelineStage};
42use anyhow::{Result, anyhow};
43use parking_lot::RwLock;
44use std::collections::HashMap;
45use std::path::{Path, PathBuf};
46use std::sync::Arc;
47use std::time::Instant;
48
49pub(crate) type GraphCache = Arc<RwLock<Option<(PathBuf, Arc<CodeGraph>)>>>;
54
55pub struct QueryExecutor {
60 pub(crate) plugin_manager: PluginManager,
61
62 pub(crate) graph_cache: GraphCache,
66
67 pub(crate) ast_parse_cache: Arc<crate::query::cache::AstParseCache>,
69
70 pub(crate) result_cache: Arc<ResultCache>,
72
73 pub(crate) disable_parallel: bool,
75
76 pub(crate) validation_options: crate::query::validator::ValidationOptions,
78}
79
80impl QueryExecutor {
81 #[must_use]
83 pub fn new() -> Self {
84 Self {
85 plugin_manager: PluginManager::new(),
86 graph_cache: Arc::new(RwLock::new(None)),
87 ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
88 result_cache: Arc::new(ResultCache::new(1000)),
89 disable_parallel: false,
90 validation_options: crate::query::validator::ValidationOptions::default(),
91 }
92 }
93
94 #[must_use]
96 pub fn with_plugin_manager(plugin_manager: PluginManager) -> Self {
97 Self {
98 plugin_manager,
99 graph_cache: Arc::new(RwLock::new(None)),
100 ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
101 result_cache: Arc::new(ResultCache::new(1000)),
102 disable_parallel: false,
103 validation_options: crate::query::validator::ValidationOptions::default(),
104 }
105 }
106
107 #[must_use]
109 pub fn plugin_manager(&self) -> &PluginManager {
110 &self.plugin_manager
111 }
112
113 fn build_registry(&self) -> crate::query::registry::FieldRegistry {
114 let mut registry = crate::query::registry::FieldRegistry::with_core_fields();
115 for plugin in self.plugin_manager.plugins() {
116 let _collisions = registry.add_plugin_fields(plugin.fields());
117 }
118
119 let normalizer = MetadataNormalizer::new();
120 for (short_form, canonical) in normalizer.mappings() {
121 if registry.contains(canonical)
122 && let Some(canonical_field) = registry.get(canonical)
123 {
124 let short_field = crate::query::types::FieldDescriptor {
125 name: short_form,
126 field_type: canonical_field.field_type.clone(),
127 operators: canonical_field.operators,
128 indexed: canonical_field.indexed,
129 doc: canonical_field.doc,
130 };
131 registry.add_field(short_field);
132 }
133 }
134
135 registry
136 }
137
138 #[must_use]
140 pub fn with_validation_options(
141 mut self,
142 options: crate::query::validator::ValidationOptions,
143 ) -> Self {
144 self.validation_options = options;
145 self
146 }
147
148 #[must_use]
150 pub fn without_parallel(mut self) -> Self {
151 self.disable_parallel = true;
152 self
153 }
154
155 pub(crate) fn get_or_load_graph(&self, dir: &Path) -> Result<Option<Arc<CodeGraph>>> {
169 let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
171
172 {
174 let cache = self.graph_cache.read();
175 if let Some((cached_path, graph)) = cache.as_ref()
176 && cached_path == &canonical_dir
177 {
178 return Ok(Some(Arc::clone(graph)));
179 }
180 }
182
183 let mut cache = self.graph_cache.write();
185
186 if let Some((cached_path, graph)) = cache.as_ref() {
188 if cached_path == &canonical_dir {
189 return Ok(Some(Arc::clone(graph)));
190 }
191 log::debug!(
193 "Graph cache invalidated due to path mismatch. Old: {}, New: {}",
194 cached_path.display(),
195 canonical_dir.display()
196 );
197 }
198
199 let storage = crate::graph::unified::persistence::GraphStorage::new(&canonical_dir);
201
202 if !storage.exists() {
203 let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
205 if auto_index_var == "false" || auto_index_var == "0" {
206 *cache = None;
207 return Ok(None);
208 }
209
210 log::info!(
211 "No graph found at {}, auto-building index",
212 canonical_dir.display()
213 );
214 drop(cache);
216
217 let config = crate::graph::unified::build::BuildConfig::default();
218 let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
219 &canonical_dir,
220 &self.plugin_manager,
221 &config,
222 "cli:auto_index",
223 )?;
224 let arc_graph = Arc::new(graph);
225
226 let mut cache = self.graph_cache.write();
227 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
228 return Ok(Some(arc_graph));
229 }
230
231 log::debug!(
233 "Loading CodeGraph from: {}",
234 storage.snapshot_path().display()
235 );
236
237 match load_from_path(storage.snapshot_path(), Some(&self.plugin_manager)) {
238 Ok(graph) => {
239 let arc_graph = Arc::new(graph);
240 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
241 Ok(Some(arc_graph))
242 }
243 Err(e) => {
244 let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
246 if auto_index_var == "false" || auto_index_var == "0" {
247 return Err(e.into());
248 }
249 log::warn!("Graph load failed ({e}), auto-rebuilding index");
250 drop(cache);
252
253 let config = crate::graph::unified::build::BuildConfig::default();
254 let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
255 &canonical_dir,
256 &self.plugin_manager,
257 &config,
258 "cli:auto_index",
259 )?;
260 let arc_graph = Arc::new(graph);
261
262 let mut cache = self.graph_cache.write();
263 *cache = Some((canonical_dir, Arc::clone(&arc_graph)));
264 Ok(Some(arc_graph))
265 }
266 }
267 }
268
269 #[must_use]
271 pub fn cache_stats(&self) -> (CacheStats, CacheStats) {
272 (self.ast_parse_cache.stats(), self.result_cache.stats())
273 }
274
275 pub fn get_query_plan(&self, query_str: &str) -> Result<QueryPlan> {
284 let start = Instant::now();
285
286 let parse_start = Instant::now();
288
289 let parsed = self.parse_query_ast(query_str)?;
290 let registry = self.build_registry();
291 let optimizer = crate::query::optimizer::Optimizer::new(registry);
292 let optimized_query = optimizer.optimize_query((*parsed.ast).clone());
293
294 let optimized_query_str = format!("{:?}", optimized_query.root);
295 let parse_step_name = "Parse query (boolean)";
296 let steps_prefix = vec![
297 (parse_step_name, 0),
298 ("Validate fields", 0),
299 ("Optimize AST", 0),
300 ];
301
302 let parse_time = parse_start
304 .elapsed()
305 .as_millis()
306 .try_into()
307 .unwrap_or(u64::MAX);
308
309 let (parse_stats, result_stats) = self.cache_stats();
311 let cache_status = CacheStatus {
312 parse_cache_hit: parse_stats.hits > 0,
313 result_cache_hit: result_stats.hits > 0,
314 };
315
316 let mut steps = Vec::new();
318 let mut step_num = 1;
319
320 for (operation, result_count) in steps_prefix {
321 steps.push(ExecutionStep {
322 step_num,
323 operation: operation.to_string(),
324 result_count,
325 time_ms: if step_num == 1 { parse_time } else { 0 },
326 });
327 step_num += 1;
328 }
329
330 steps.push(ExecutionStep {
332 step_num,
333 operation: "CodeGraph lookup".to_string(),
334 result_count: 0,
335 time_ms: 0,
336 });
337
338 let total_time = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
340
341 Ok(QueryPlan::new(
342 query_str.to_string(),
343 optimized_query_str,
344 steps,
345 total_time,
346 true, cache_status,
348 ))
349 }
350
351 #[cfg(test)]
353 pub fn clear_caches(&self) {
354 self.ast_parse_cache.clear();
355 self.result_cache.clear();
356 }
357
358 pub fn parse_query_ast(&self, query_str: &str) -> Result<Arc<crate::query::ParsedQuery>> {
393 if let Some(cached_parsed) = self.ast_parse_cache.get(query_str) {
395 log::trace!("AST parse cache HIT for: {query_str}");
396 return Ok(cached_parsed);
397 }
398
399 log::trace!("AST parse cache MISS, parsing: {query_str}");
400
401 let ast = crate::query::parser_new::Parser::parse_query(query_str)
403 .map_err(|err| err.with_source(query_str))?;
404
405 let registry = self.build_registry();
406
407 let validator =
409 crate::query::validator::Validator::with_options(registry, self.validation_options);
410 let mut normalized_ast = ast.clone();
411 normalized_ast.root = match validator.normalize_expr(&ast.root) {
412 Ok(root) => root,
413 Err(validation_err) => {
414 let query_error = crate::query::error::QueryError::Validation(validation_err);
416 return Err(query_error.with_source(query_str).into());
417 }
418 };
419 if let Err(validation_err) = validator.validate(&normalized_ast.root) {
420 let query_error = crate::query::error::QueryError::Validation(validation_err);
422 return Err(query_error.with_source(query_str).into());
423 }
424
425 let parsed = crate::query::ParsedQuery::from_ast(Arc::new(normalized_ast))?;
427
428 let arc_parsed = Arc::new(parsed);
430 self.ast_parse_cache
431 .insert_arc(query_str.to_string(), Arc::clone(&arc_parsed));
432
433 Ok(arc_parsed)
434 }
435}
436
437impl Default for QueryExecutor {
438 fn default() -> Self {
439 Self::new()
440 }
441}
442
443impl QueryExecutor {
444 pub fn execute_on_graph(&self, query: &str, path: &Path) -> Result<QueryResults> {
475 self.execute_on_graph_with_variables(query, path, None)
476 }
477
478 pub fn execute_on_graph_with_variables(
488 &self,
489 query: &str,
490 path: &Path,
491 variables: Option<&HashMap<String, String>>,
492 ) -> Result<QueryResults> {
493 let parsed = self.parse_query_ast(query)?;
494 let graph = self
495 .get_or_load_graph(path)?
496 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
497
498 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
499
500 let effective_root = if let Some(vars) = variables {
502 crate::query::types::resolve_variables(&parsed.ast.root, vars)
503 .map_err(|e| anyhow!("Variable resolution error: {e}"))?
504 } else {
505 parsed.ast.root.clone()
506 };
507
508 let mut ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
509 .with_workspace_root(&workspace_root)
510 .with_parallel_disabled(self.disable_parallel);
511
512 for target in graph_eval::collect_import_targets(&effective_root) {
514 ctx.precompute_imports(&target);
515 }
516
517 let matches = graph_eval::evaluate_all(&mut ctx, &effective_root)?;
518 let mut results = QueryResults::new(graph, matches).with_workspace_root(workspace_root);
519 results.sort_by_location();
520 Ok(results)
521 }
522
523 pub fn execute_join(
531 &self,
532 query: &str,
533 path: &Path,
534 variables: Option<&HashMap<String, String>>,
535 ) -> Result<JoinResults> {
536 let parsed = self.parse_query_ast(query)?;
537 let graph = self
538 .get_or_load_graph(path)?
539 .ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
540
541 let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
542
543 let effective_root = if let Some(vars) = variables {
545 crate::query::types::resolve_variables(&parsed.ast.root, vars)
546 .map_err(|e| anyhow!("Variable resolution error: {e}"))?
547 } else {
548 parsed.ast.root.clone()
549 };
550
551 let Expr::Join(join) = &effective_root else {
552 return Err(anyhow!(
553 "Expected a join expression (e.g., `(kind:function) CALLS (kind:function)`)"
554 ));
555 };
556
557 let ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
558 .with_workspace_root(&workspace_root)
559 .with_parallel_disabled(self.disable_parallel);
560
561 let eval_result = graph_eval::evaluate_join(&ctx, join, None)?;
562 let results = JoinResults::new(
563 graph,
564 eval_result.pairs,
565 join.edge.clone(),
566 eval_result.truncated,
567 )
568 .with_workspace_root(workspace_root);
569 Ok(results)
570 }
571
572 pub fn execute_pipeline(
578 &self,
579 query: &str,
580 stages: &[PipelineStage],
581 path: &Path,
582 variables: Option<&HashMap<String, String>>,
583 ) -> Result<Vec<AggregationResult>> {
584 let results = self.execute_on_graph_with_variables(query, path, variables)?;
585
586 let mut aggregations = Vec::new();
587 for stage in stages {
588 aggregations.push(super::pipeline::execute_pipeline_stage(&results, stage));
589 }
590 Ok(aggregations)
591 }
592
593 pub fn execute_full(
601 &self,
602 query: &str,
603 path: &Path,
604 variables: Option<&HashMap<String, String>>,
605 ) -> Result<QueryOutput> {
606 let parsed = self.parse_query_ast(query)?;
607
608 if matches!(&parsed.ast.root, Expr::Join(_)) {
610 let join_results = self.execute_join(query, path, variables)?;
611 return Ok(QueryOutput::Join(join_results));
612 }
613
614 if let Some(pipeline) = crate::query::parser_new::Parser::parse_pipeline_query(query)
616 .map_err(|err| err.with_source(query))?
617 {
618 let aggregations = self.execute_pipeline(query, &pipeline.stages, path, variables)?;
619 if let Some(last) = aggregations.into_iter().last() {
621 return Ok(QueryOutput::Aggregation(last));
622 }
623 }
624
625 let results = self.execute_on_graph_with_variables(query, path, variables)?;
627 Ok(QueryOutput::Results(results))
628 }
629}