use super::graph_eval;
use crate::graph::unified::concurrent::CodeGraph;
use crate::graph::unified::persistence::load_from_path;
use crate::normalizer::MetadataNormalizer;
use crate::plugin::PluginManager;
use crate::query::cache::{CacheStats, ResultCache};
use crate::query::pipeline::AggregationResult;
use crate::query::plan::{CacheStatus, ExecutionStep, QueryPlan};
use crate::query::results::{JoinResults, QueryOutput, QueryResults};
use crate::query::types::{Expr, PipelineStage};
use anyhow::{Result, anyhow};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
pub(crate) type GraphCache = Arc<RwLock<Option<(PathBuf, Arc<CodeGraph>)>>>;
pub struct QueryExecutor {
pub(crate) plugin_manager: PluginManager,
pub(crate) graph_cache: GraphCache,
pub(crate) ast_parse_cache: Arc<crate::query::cache::AstParseCache>,
pub(crate) result_cache: Arc<ResultCache>,
pub(crate) disable_parallel: bool,
pub(crate) validation_options: crate::query::validator::ValidationOptions,
}
impl QueryExecutor {
#[must_use]
pub fn new() -> Self {
Self {
plugin_manager: PluginManager::new(),
graph_cache: Arc::new(RwLock::new(None)),
ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
result_cache: Arc::new(ResultCache::new(1000)),
disable_parallel: false,
validation_options: crate::query::validator::ValidationOptions::default(),
}
}
#[must_use]
pub fn with_plugin_manager(plugin_manager: PluginManager) -> Self {
Self {
plugin_manager,
graph_cache: Arc::new(RwLock::new(None)),
ast_parse_cache: Arc::new(crate::query::cache::AstParseCache::new(1000)),
result_cache: Arc::new(ResultCache::new(1000)),
disable_parallel: false,
validation_options: crate::query::validator::ValidationOptions::default(),
}
}
#[must_use]
pub fn plugin_manager(&self) -> &PluginManager {
&self.plugin_manager
}
fn build_registry(&self) -> crate::query::registry::FieldRegistry {
let mut registry = crate::query::registry::FieldRegistry::with_core_fields();
for plugin in self.plugin_manager.plugins() {
let _collisions = registry.add_plugin_fields(plugin.fields());
}
let normalizer = MetadataNormalizer::new();
for (short_form, canonical) in normalizer.mappings() {
if registry.contains(canonical)
&& let Some(canonical_field) = registry.get(canonical)
{
let short_field = crate::query::types::FieldDescriptor {
name: short_form,
field_type: canonical_field.field_type.clone(),
operators: canonical_field.operators,
indexed: canonical_field.indexed,
doc: canonical_field.doc,
};
registry.add_field(short_field);
}
}
registry
}
#[must_use]
pub fn with_validation_options(
mut self,
options: crate::query::validator::ValidationOptions,
) -> Self {
self.validation_options = options;
self
}
#[must_use]
pub fn without_parallel(mut self) -> Self {
self.disable_parallel = true;
self
}
pub(crate) fn get_or_load_graph(&self, dir: &Path) -> Result<Option<Arc<CodeGraph>>> {
let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.to_path_buf());
{
let cache = self.graph_cache.read();
if let Some((cached_path, graph)) = cache.as_ref()
&& cached_path == &canonical_dir
{
return Ok(Some(Arc::clone(graph)));
}
}
let mut cache = self.graph_cache.write();
if let Some((cached_path, graph)) = cache.as_ref() {
if cached_path == &canonical_dir {
return Ok(Some(Arc::clone(graph)));
}
log::debug!(
"Graph cache invalidated due to path mismatch. Old: {}, New: {}",
cached_path.display(),
canonical_dir.display()
);
}
let storage = crate::graph::unified::persistence::GraphStorage::new(&canonical_dir);
if !storage.exists() {
let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
if auto_index_var == "false" || auto_index_var == "0" {
*cache = None;
return Ok(None);
}
log::info!(
"No graph found at {}, auto-building index",
canonical_dir.display()
);
drop(cache);
let config = crate::graph::unified::build::BuildConfig::default();
let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
&canonical_dir,
&self.plugin_manager,
&config,
"cli:auto_index",
)?;
let arc_graph = Arc::new(graph);
let mut cache = self.graph_cache.write();
*cache = Some((canonical_dir, Arc::clone(&arc_graph)));
return Ok(Some(arc_graph));
}
log::debug!(
"Loading CodeGraph from: {}",
storage.snapshot_path().display()
);
match load_from_path(storage.snapshot_path(), Some(&self.plugin_manager)) {
Ok(graph) => {
let arc_graph = Arc::new(graph);
*cache = Some((canonical_dir, Arc::clone(&arc_graph)));
Ok(Some(arc_graph))
}
Err(e) => {
let auto_index_var = std::env::var("SQRY_AUTO_INDEX").unwrap_or_default();
if auto_index_var == "false" || auto_index_var == "0" {
return Err(e.into());
}
log::warn!("Graph load failed ({e}), auto-rebuilding index");
drop(cache);
let config = crate::graph::unified::build::BuildConfig::default();
let (graph, _build_result) = crate::graph::unified::build::build_and_persist_graph(
&canonical_dir,
&self.plugin_manager,
&config,
"cli:auto_index",
)?;
let arc_graph = Arc::new(graph);
let mut cache = self.graph_cache.write();
*cache = Some((canonical_dir, Arc::clone(&arc_graph)));
Ok(Some(arc_graph))
}
}
}
#[must_use]
pub fn cache_stats(&self) -> (CacheStats, CacheStats) {
(self.ast_parse_cache.stats(), self.result_cache.stats())
}
pub fn get_query_plan(&self, query_str: &str) -> Result<QueryPlan> {
let start = Instant::now();
let parse_start = Instant::now();
let parsed = self.parse_query_ast(query_str)?;
let registry = self.build_registry();
let optimizer = crate::query::optimizer::Optimizer::new(registry);
let optimized_query = optimizer.optimize_query((*parsed.ast).clone());
let optimized_query_str = format!("{:?}", optimized_query.root);
let parse_step_name = "Parse query (boolean)";
let steps_prefix = vec![
(parse_step_name, 0),
("Validate fields", 0),
("Optimize AST", 0),
];
let parse_time = parse_start
.elapsed()
.as_millis()
.try_into()
.unwrap_or(u64::MAX);
let (parse_stats, result_stats) = self.cache_stats();
let cache_status = CacheStatus {
parse_cache_hit: parse_stats.hits > 0,
result_cache_hit: result_stats.hits > 0,
};
let mut steps = Vec::new();
let mut step_num = 1;
for (operation, result_count) in steps_prefix {
steps.push(ExecutionStep {
step_num,
operation: operation.to_string(),
result_count,
time_ms: if step_num == 1 { parse_time } else { 0 },
});
step_num += 1;
}
steps.push(ExecutionStep {
step_num,
operation: "CodeGraph lookup".to_string(),
result_count: 0,
time_ms: 0,
});
let total_time = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX);
Ok(QueryPlan::new(
query_str.to_string(),
optimized_query_str,
steps,
total_time,
true, cache_status,
))
}
#[cfg(test)]
pub fn clear_caches(&self) {
self.ast_parse_cache.clear();
self.result_cache.clear();
}
pub fn parse_query_ast(&self, query_str: &str) -> Result<Arc<crate::query::ParsedQuery>> {
if let Some(cached_parsed) = self.ast_parse_cache.get(query_str) {
log::trace!("AST parse cache HIT for: {query_str}");
return Ok(cached_parsed);
}
log::trace!("AST parse cache MISS, parsing: {query_str}");
let ast = crate::query::parser_new::Parser::parse_query(query_str)
.map_err(|err| err.with_source(query_str))?;
let registry = self.build_registry();
let validator =
crate::query::validator::Validator::with_options(registry, self.validation_options);
let mut normalized_ast = ast.clone();
normalized_ast.root = match validator.normalize_expr(&ast.root) {
Ok(root) => root,
Err(validation_err) => {
let query_error = crate::query::error::QueryError::Validation(validation_err);
return Err(query_error.with_source(query_str).into());
}
};
if let Err(validation_err) = validator.validate(&normalized_ast.root) {
let query_error = crate::query::error::QueryError::Validation(validation_err);
return Err(query_error.with_source(query_str).into());
}
let parsed = crate::query::ParsedQuery::from_ast(Arc::new(normalized_ast))?;
let arc_parsed = Arc::new(parsed);
self.ast_parse_cache
.insert_arc(query_str.to_string(), Arc::clone(&arc_parsed));
Ok(arc_parsed)
}
}
impl Default for QueryExecutor {
fn default() -> Self {
Self::new()
}
}
impl QueryExecutor {
pub fn execute_on_graph(&self, query: &str, path: &Path) -> Result<QueryResults> {
self.execute_on_graph_with_variables(query, path, None)
}
pub fn execute_on_graph_with_variables(
&self,
query: &str,
path: &Path,
variables: Option<&HashMap<String, String>>,
) -> Result<QueryResults> {
let parsed = self.parse_query_ast(query)?;
let graph = self
.get_or_load_graph(path)?
.ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
let effective_root = if let Some(vars) = variables {
crate::query::types::resolve_variables(&parsed.ast.root, vars)
.map_err(|e| anyhow!("Variable resolution error: {e}"))?
} else {
parsed.ast.root.clone()
};
let mut ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
.with_workspace_root(&workspace_root)
.with_parallel_disabled(self.disable_parallel);
for target in graph_eval::collect_import_targets(&effective_root) {
ctx.precompute_imports(&target);
}
let matches = graph_eval::evaluate_all(&mut ctx, &effective_root)?;
let mut results = QueryResults::new(graph, matches).with_workspace_root(workspace_root);
results.sort_by_location();
Ok(results)
}
pub fn execute_join(
&self,
query: &str,
path: &Path,
variables: Option<&HashMap<String, String>>,
) -> Result<JoinResults> {
let parsed = self.parse_query_ast(query)?;
let graph = self
.get_or_load_graph(path)?
.ok_or_else(|| anyhow!("No graph found. Run `sqry index {}` first.", path.display()))?;
let workspace_root = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
let effective_root = if let Some(vars) = variables {
crate::query::types::resolve_variables(&parsed.ast.root, vars)
.map_err(|e| anyhow!("Variable resolution error: {e}"))?
} else {
parsed.ast.root.clone()
};
let Expr::Join(join) = &effective_root else {
return Err(anyhow!(
"Expected a join expression (e.g., `(kind:function) CALLS (kind:function)`)"
));
};
let ctx = graph_eval::GraphEvalContext::new(&graph, &self.plugin_manager)
.with_workspace_root(&workspace_root)
.with_parallel_disabled(self.disable_parallel);
let eval_result = graph_eval::evaluate_join(&ctx, join, None)?;
let results = JoinResults::new(
graph,
eval_result.pairs,
join.edge.clone(),
eval_result.truncated,
)
.with_workspace_root(workspace_root);
Ok(results)
}
pub fn execute_pipeline(
&self,
query: &str,
stages: &[PipelineStage],
path: &Path,
variables: Option<&HashMap<String, String>>,
) -> Result<Vec<AggregationResult>> {
let results = self.execute_on_graph_with_variables(query, path, variables)?;
let mut aggregations = Vec::new();
for stage in stages {
aggregations.push(super::pipeline::execute_pipeline_stage(&results, stage));
}
Ok(aggregations)
}
pub fn execute_full(
&self,
query: &str,
path: &Path,
variables: Option<&HashMap<String, String>>,
) -> Result<QueryOutput> {
let parsed = self.parse_query_ast(query)?;
if matches!(&parsed.ast.root, Expr::Join(_)) {
let join_results = self.execute_join(query, path, variables)?;
return Ok(QueryOutput::Join(join_results));
}
if let Some(pipeline) = crate::query::parser_new::Parser::parse_pipeline_query(query)
.map_err(|err| err.with_source(query))?
{
let aggregations = self.execute_pipeline(query, &pipeline.stages, path, variables)?;
if let Some(last) = aggregations.into_iter().last() {
return Ok(QueryOutput::Aggregation(last));
}
}
let results = self.execute_on_graph_with_variables(query, path, variables)?;
Ok(QueryOutput::Results(results))
}
}