Skip to main content

sqry_cli/commands/
query.rs

1// RKG: CODE:SQRY-CLI implements REQ:SQRY-RUBY-QUALIFIED-CALLERS
2//! Query command implementation
3
4use crate::args::Cli;
5use crate::index_discovery::{augment_query_with_scope, find_nearest_index};
6use crate::output::{
7    DisplaySymbol, OutputStreams, call_identity_from_qualified_name, create_formatter,
8};
9use crate::plugin_defaults::{self, PluginSelectionMode};
10use anyhow::{Context, Result, bail};
11use sqry_core::query::QueryExecutor;
12use sqry_core::query::parser_new::Parser as QueryParser;
13use sqry_core::query::results::QueryResults;
14use sqry_core::query::security::QuerySecurityConfig;
15use sqry_core::query::types::{Expr, Value};
16use sqry_core::query::validator::ValidationOptions;
17use sqry_core::relations::CallIdentityMetadata;
18use sqry_core::search::Match as TextMatch;
19use sqry_core::search::classifier::{QueryClassifier, QueryType};
20use sqry_core::search::fallback::{FallbackConfig, FallbackSearchEngine, SearchResults};
21use sqry_core::session::{SessionManager, SessionStats};
22use std::env;
23use std::path::{Path, PathBuf};
24use std::sync::Mutex;
25use std::time::{Duration, Instant};
26
27static QUERY_SESSION: std::sync::LazyLock<Mutex<Option<SessionManager>>> =
28    std::sync::LazyLock::new(|| Mutex::new(None));
29
30const DEFAULT_QUERY_LIMIT: usize = 1000;
31
32/// Simple query statistics for CLI display (replaces `sqry_core::query::QueryStats`).
33#[derive(Debug, Clone, Default)]
34struct SimpleQueryStats {
35    /// Whether a graph/index was used
36    used_index: bool,
37}
38
39/// Convert `QueryResults` to `Vec<DisplaySymbol>` for display purposes.
40///
41/// This creates `DisplaySymbol` structs directly from `QueryMatch`,
42/// avoiding the deprecated Symbol intermediate type.
43fn query_results_to_display_symbols(results: &QueryResults) -> Vec<DisplaySymbol> {
44    results
45        .iter()
46        .map(|m| DisplaySymbol::from_query_match(&m))
47        .collect()
48}
49
50struct QueryExecution {
51    stats: SimpleQueryStats,
52    symbols: Vec<DisplaySymbol>,
53    executor: Option<QueryExecutor>,
54}
55
56enum QueryExecutionOutcome {
57    Terminal,
58    Continue(Box<QueryExecution>),
59}
60
61struct NonSessionQueryParams<'a> {
62    cli: &'a Cli,
63    query_string: &'a str,
64    search_path: &'a str,
65    validation_options: ValidationOptions,
66    verbose: bool,
67    no_parallel: bool,
68    relation_context: &'a RelationDisplayContext,
69    variables: Option<&'a std::collections::HashMap<String, String>>,
70}
71
72struct QueryExecutionParams<'a> {
73    cli: &'a Cli,
74    query_string: &'a str,
75    search_path: &'a Path,
76    validation_options: ValidationOptions,
77    no_parallel: bool,
78    start: Instant,
79    query_type: QueryType,
80    variables: Option<&'a std::collections::HashMap<String, String>>,
81}
82
83struct QueryRenderParams<'a> {
84    cli: &'a Cli,
85    query_string: &'a str,
86    verbose: bool,
87    start: Instant,
88    relation_context: &'a RelationDisplayContext,
89    index_info: IndexDiagnosticInfo,
90}
91
92struct HybridQueryParams<'a> {
93    cli: &'a Cli,
94    query_string: &'a str,
95    search_path: &'a Path,
96    validation_options: ValidationOptions,
97    no_parallel: bool,
98    start: Instant,
99    query_type: QueryType,
100    variables: Option<&'a std::collections::HashMap<String, String>>,
101}
102
103/// Run a query command to search for symbols using AST-aware predicates
104///
105/// # Arguments
106///
107/// * `cli` - CLI arguments
108/// * `query_string` - Query string with predicates (e.g., "kind:function AND name~=/test/")
109/// * `search_path` - Path to search (file or directory)
110/// * `explain` - If true, explain the query instead of executing it
111/// * `verbose` - If true, show verbose output including cache statistics
112/// * `session_mode` - If true, use persistent session for repeated queries
113/// * `no_parallel` - If true, disable parallel query execution (for A/B testing)
114/// * `timeout_secs` - Query timeout in seconds (max 30s per security policy)
115/// * `result_limit` - Maximum number of results to return
116///
117/// # Errors
118/// Returns an error if query validation fails, execution fails, or output cannot be written.
119///
120/// # STEP_8 precedence
121///
122/// `search_path` must be resolved by the caller via
123/// [`crate::args::Cli::resolve_subcommand_path`] so that an explicit positional
124/// `<path>` argument wins over the global `--workspace` /
125/// `SQRY_WORKSPACE_FILE` flag.
126#[allow(clippy::too_many_arguments)]
127#[allow(clippy::fn_params_excessive_bools)] // CLI flags map directly to booleans.
128pub fn run_query(
129    cli: &Cli,
130    query_string: &str,
131    search_path: &str,
132    explain: bool,
133    verbose: bool,
134    session_mode: bool,
135    no_parallel: bool,
136    timeout_secs: Option<u64>,
137    result_limit: Option<usize>,
138    variables: &[String],
139) -> Result<()> {
140    // Create output streams with optional pager support
141    let mut streams = OutputStreams::with_pager(cli.pager_config());
142
143    ensure_repo_predicate_not_present(query_string)?;
144
145    let validation_options = build_validation_options(cli);
146
147    // Build security config from CLI flags (30s ceiling is enforced by QuerySecurityConfig)
148    let security_config = build_security_config(timeout_secs, result_limit);
149    maybe_emit_security_diagnostics(&mut streams, &security_config, verbose)?;
150
151    // NOTE: Security enforcement via QueryGuard will be integrated into QueryExecutor
152    // in a future enhancement. For now, the config is built and validated.
153    let _ = &security_config; // Silence unused warning until full integration
154
155    // Parse --var KEY=VALUE pairs into a variables map for the executor
156    let parsed_variables = parse_variable_args(variables)?;
157    let variables_opt = if parsed_variables.is_empty() {
158        None
159    } else {
160        Some(&parsed_variables)
161    };
162
163    // Check for pipeline queries (base query | stage)
164    if let Some(pipeline) = detect_pipeline_query(query_string)? {
165        run_pipeline_query(
166            cli,
167            &mut streams,
168            query_string,
169            search_path,
170            &pipeline,
171            no_parallel,
172            variables_opt,
173        )?;
174        return streams.finish_checked();
175    }
176
177    // Check for join queries (LHS CALLS RHS)
178    if is_join_query(query_string) {
179        run_join_query(
180            cli,
181            &mut streams,
182            query_string,
183            search_path,
184            no_parallel,
185            variables_opt,
186        )?;
187        return streams.finish_checked();
188    }
189
190    // If explain mode, use get_query_plan for detailed output (semantic only)
191    if explain {
192        run_query_explain(query_string, validation_options, no_parallel, &mut streams)?;
193        return streams.finish_checked();
194    }
195
196    let relation_context = RelationDisplayContext::from_query(query_string);
197
198    // IMPORTANT: Check session mode FIRST, before any index loading
199    // This allows session queries to short-circuit directly to the cached executor
200    // (fixes CODEX MEDIUM-2: session mode was validating before checking cache)
201    // RKG: CODE:SQRY-CLI implements REQ:SQRY-RUBY-QUALIFIED-CALLERS (MEDIUM-2 fix)
202    if session_mode {
203        let result = run_query_with_session(
204            cli,
205            &mut streams,
206            query_string,
207            search_path,
208            verbose,
209            no_parallel,
210            &relation_context,
211        );
212        // Check result first, then finalize pager
213        // If the query failed, return that error; otherwise check pager status
214        result?;
215        return streams.finish_checked();
216    }
217
218    let params = NonSessionQueryParams {
219        cli,
220        query_string,
221        search_path,
222        validation_options,
223        verbose,
224        no_parallel,
225        relation_context: &relation_context,
226        variables: variables_opt,
227    };
228    run_query_non_session(&mut streams, &params)?;
229
230    // Finalize pager (flushes buffer, waits for pager if spawned, propagates exit code)
231    streams.finish_checked()
232}
233
234fn build_validation_options(cli: &Cli) -> ValidationOptions {
235    ValidationOptions {
236        fuzzy_fields: cli.fuzzy_fields,
237        fuzzy_field_distance: cli.fuzzy_field_distance,
238    }
239}
240
241fn build_security_config(
242    timeout_secs: Option<u64>,
243    result_limit: Option<usize>,
244) -> QuerySecurityConfig {
245    let mut config = QuerySecurityConfig::default();
246    if let Some(secs) = timeout_secs {
247        config = config.with_timeout(Duration::from_secs(secs));
248    }
249    if let Some(limit) = result_limit {
250        config = config.with_result_cap(limit);
251    }
252    config
253}
254
255fn maybe_emit_security_diagnostics(
256    streams: &mut OutputStreams,
257    security_config: &QuerySecurityConfig,
258    verbose: bool,
259) -> Result<()> {
260    if verbose {
261        streams.write_diagnostic(&format!(
262            "[Security] timeout={}s, limit={}, memory={}MB",
263            security_config.timeout().as_secs(),
264            security_config.result_cap(),
265            security_config.memory_limit() / (1024 * 1024),
266        ))?;
267    }
268    Ok(())
269}
270
271fn run_query_explain(
272    query_string: &str,
273    validation_options: ValidationOptions,
274    no_parallel: bool,
275    streams: &mut OutputStreams,
276) -> Result<()> {
277    let mut executor = create_executor_with_plugins().with_validation_options(validation_options);
278    if no_parallel {
279        executor = executor.without_parallel();
280    }
281    let plan = executor.get_query_plan(query_string)?;
282    let explain_output = format!(
283        "Query Plan:\n  Original: {}\n  Optimized: {}\n\nExecution:\n{}\n\nPerformance:\n  Execution time: {}ms\n  Index-aware: {}\n  Cache: {}",
284        plan.original_query,
285        plan.optimized_query,
286        format_execution_steps(&plan.steps),
287        plan.execution_time_ms,
288        if plan.used_index { "Yes" } else { "No" },
289        format_cache_status(&plan.cache_status),
290    );
291    streams.write_diagnostic(&explain_output)?;
292    Ok(())
293}
294
295/// Resolved effective index root, augmented query, and diagnostic info.
296struct EffectiveIndexResolution {
297    index_root: PathBuf,
298    query: String,
299    info: IndexDiagnosticInfo,
300}
301
302/// Walk up the directory tree to find the nearest index, determine the effective
303/// index root, augment the query with scope filters if needed, and build diagnostic info.
304fn resolve_effective_index_root(
305    search_path: &Path,
306    query_string: &str,
307) -> EffectiveIndexResolution {
308    let index_location = find_nearest_index(search_path);
309
310    if let Some(ref loc) = index_location {
311        let root = loc.index_root.clone();
312        let (query, filtered_to) = if loc.requires_scope_filter {
313            if let Some(relative_scope) = loc.relative_scope() {
314                let scope_str = if loc.is_file_query {
315                    relative_scope.to_string_lossy().into_owned()
316                } else {
317                    format!("{}/**", relative_scope.display())
318                };
319                let augmented =
320                    augment_query_with_scope(query_string, &relative_scope, loc.is_file_query);
321                (augmented, Some(scope_str))
322            } else {
323                (query_string.to_string(), None)
324            }
325        } else {
326            (query_string.to_string(), None)
327        };
328        let info = IndexDiagnosticInfo {
329            index_root: Some(root.clone()),
330            filtered_to,
331            used_ancestor_index: loc.is_ancestor,
332        };
333        EffectiveIndexResolution {
334            index_root: root,
335            query,
336            info,
337        }
338    } else {
339        EffectiveIndexResolution {
340            index_root: search_path.to_path_buf(),
341            query: query_string.to_string(),
342            info: IndexDiagnosticInfo::default(),
343        }
344    }
345}
346
347fn run_query_non_session(
348    streams: &mut OutputStreams,
349    params: &NonSessionQueryParams<'_>,
350) -> Result<()> {
351    let NonSessionQueryParams {
352        cli,
353        query_string,
354        search_path,
355        validation_options,
356        verbose,
357        no_parallel,
358        relation_context,
359        variables,
360    } = *params;
361    let search_path_path = Path::new(search_path);
362
363    // Index ancestor discovery: find nearest .sqry-index in directory tree
364    let resolution = resolve_effective_index_root(search_path_path, query_string);
365    let EffectiveIndexResolution {
366        index_root: effective_index_root,
367        query: effective_query,
368        info: index_info,
369    } = resolution;
370
371    let query_type = QueryClassifier::classify(&effective_query);
372
373    let start = Instant::now();
374    let execution_params = QueryExecutionParams {
375        cli,
376        query_string: &effective_query,
377        search_path: &effective_index_root,
378        validation_options,
379        no_parallel,
380        start,
381        query_type,
382        variables,
383    };
384    let outcome = execute_query_mode(streams, &execution_params)?;
385    let render_params = QueryRenderParams {
386        cli,
387        query_string: &effective_query,
388        verbose,
389        start,
390        relation_context,
391        index_info,
392    };
393    render_query_outcome(streams, outcome, render_params)
394}
395
396fn execute_query_mode(
397    streams: &mut OutputStreams,
398    params: &QueryExecutionParams<'_>,
399) -> Result<QueryExecutionOutcome> {
400    let cli = params.cli;
401    let query_string = params.query_string;
402    let search_path = params.search_path;
403    let validation_options = params.validation_options;
404    let no_parallel = params.no_parallel;
405    let start = params.start;
406    let query_type = params.query_type;
407    let variables = params.variables;
408
409    if should_use_hybrid_search(cli) {
410        let params = HybridQueryParams {
411            cli,
412            query_string,
413            search_path,
414            validation_options,
415            no_parallel,
416            start,
417            query_type,
418            variables,
419        };
420        execute_hybrid_query(streams, &params)
421    } else {
422        execute_semantic_query(
423            cli,
424            query_string,
425            search_path,
426            validation_options,
427            no_parallel,
428            variables,
429        )
430    }
431}
432
433fn render_query_outcome(
434    streams: &mut OutputStreams,
435    outcome: QueryExecutionOutcome,
436    params: QueryRenderParams<'_>,
437) -> Result<()> {
438    let QueryRenderParams {
439        cli,
440        query_string,
441        verbose,
442        start,
443        relation_context,
444        index_info,
445    } = params;
446    if let QueryExecutionOutcome::Continue(mut execution) = outcome {
447        let elapsed = start.elapsed();
448        let execution = &mut *execution;
449        let diagnostics = QueryDiagnostics::Standard { index_info };
450        render_semantic_results(
451            cli,
452            streams,
453            query_string,
454            &mut execution.symbols,
455            &execution.stats,
456            elapsed,
457            verbose,
458            execution.executor.as_ref(),
459            &diagnostics,
460            relation_context,
461        )?;
462    }
463
464    Ok(())
465}
466
467fn execute_hybrid_query(
468    streams: &mut OutputStreams,
469    params: &HybridQueryParams<'_>,
470) -> Result<QueryExecutionOutcome> {
471    let cli = params.cli;
472    let query_string = params.query_string;
473    let search_path = params.search_path;
474    let validation_options = params.validation_options;
475    let no_parallel = params.no_parallel;
476    let start = params.start;
477    let query_type = params.query_type;
478    let variables = params.variables;
479
480    // Resolve variables in the query string for hybrid search.
481    // FallbackSearchEngine doesn't support variable threading, so we resolve
482    // at the AST level and serialize back to a query string before passing it.
483    let effective_query = if let Some(vars) = variables {
484        let ast = QueryParser::parse_query(query_string)
485            .map_err(|e| anyhow::anyhow!("Failed to parse query for variable resolution: {e}"))?;
486        let resolved = sqry_core::query::types::resolve_variables(&ast.root, vars)
487            .map_err(|e| anyhow::anyhow!("{e}"))?;
488        let resolved_ast = sqry_core::query::types::Query {
489            root: resolved,
490            span: ast.span,
491        };
492        std::borrow::Cow::Owned(sqry_core::query::parsed_query::serialize_query(
493            &resolved_ast,
494        ))
495    } else {
496        std::borrow::Cow::Borrowed(query_string)
497    };
498
499    // Use hybrid search engine with plugin-enabled executor
500    // This allows metadata queries like async:true and visibility:public to work
501    let config = build_hybrid_config(cli);
502    let mut executor = create_executor_with_plugins_for_cli(cli, search_path)?
503        .with_validation_options(validation_options);
504    if no_parallel {
505        executor = executor.without_parallel();
506    }
507    let mut engine = FallbackSearchEngine::with_config_and_executor(config.clone(), executor)?;
508
509    emit_search_mode_diagnostic(cli, streams, query_type, &config)?;
510
511    let results = run_hybrid_search(cli, &mut engine, &effective_query, search_path)?;
512    let elapsed = start.elapsed();
513
514    match results {
515        SearchResults::Semantic { results, .. } => {
516            let symbols = query_results_to_display_symbols(&results);
517            Ok(QueryExecutionOutcome::Continue(Box::new(QueryExecution {
518                stats: build_query_stats(true, symbols.len()),
519                symbols,
520                executor: None,
521            })))
522        }
523        SearchResults::Text { matches, .. } => {
524            render_text_results(cli, streams, &matches, elapsed)?;
525            Ok(QueryExecutionOutcome::Terminal)
526        }
527    }
528}
529
530fn execute_semantic_query(
531    cli: &Cli,
532    query_string: &str,
533    search_path: &Path,
534    validation_options: ValidationOptions,
535    no_parallel: bool,
536    variables: Option<&std::collections::HashMap<String, String>>,
537) -> Result<QueryExecutionOutcome> {
538    let mut executor = create_executor_with_plugins_for_cli(cli, search_path)?
539        .with_validation_options(validation_options);
540    if no_parallel {
541        executor = executor.without_parallel();
542    }
543    let query_results =
544        executor.execute_on_graph_with_variables(query_string, search_path, variables)?;
545    let symbols = query_results_to_display_symbols(&query_results);
546    let stats = SimpleQueryStats { used_index: true };
547    Ok(QueryExecutionOutcome::Continue(Box::new(QueryExecution {
548        stats,
549        symbols,
550        executor: Some(executor),
551    })))
552}
553
554fn emit_search_mode_diagnostic(
555    cli: &Cli,
556    streams: &mut OutputStreams,
557    query_type: QueryType,
558    config: &FallbackConfig,
559) -> Result<()> {
560    if !config.show_search_mode || cli.json {
561        return Ok(());
562    }
563
564    let message = match query_type {
565        QueryType::Semantic => "[Semantic search mode]",
566        QueryType::Text => "[Text search mode]",
567        QueryType::Hybrid => "[Hybrid mode: trying semantic first...]",
568    };
569    streams.write_diagnostic(message)?;
570    Ok(())
571}
572
573fn run_hybrid_search(
574    cli: &Cli,
575    engine: &mut FallbackSearchEngine,
576    query_string: &str,
577    search_path: &Path,
578) -> Result<SearchResults> {
579    if cli.text {
580        // Force text-only search
581        engine.search_text_only(query_string, search_path)
582    } else if cli.semantic {
583        // Force semantic-only search
584        engine.search_semantic_only(query_string, search_path)
585    } else {
586        // Automatic hybrid search with fallback
587        engine.search(query_string, search_path)
588    }
589}
590
591fn build_query_stats(used_index: bool, _symbol_count: usize) -> SimpleQueryStats {
592    SimpleQueryStats { used_index }
593}
594
595fn render_text_results(
596    cli: &Cli,
597    streams: &mut OutputStreams,
598    matches: &[TextMatch],
599    elapsed: Duration,
600) -> Result<()> {
601    if cli.json {
602        // JSON mode: serialize text matches directly
603        let json_output = serde_json::json!({
604            "text_matches": matches,
605            "match_count": matches.len(),
606            "execution_time_ms": elapsed.as_millis(),
607        });
608        streams.write_result(&serde_json::to_string_pretty(&json_output)?)?;
609    } else if cli.count {
610        // Count mode: just show the count
611        streams.write_result(&matches.len().to_string())?;
612    } else {
613        // Normal mode: print matches in grep format
614        for m in matches {
615            streams.write_result(&format!(
616                "{}:{}:{}",
617                m.path.display(),
618                m.line,
619                m.line_text.trim()
620            ))?;
621        }
622
623        // Show performance info to stderr (not in JSON or count mode)
624        streams.write_diagnostic(&format!(
625            "\nQuery executed ({}ms) - {} text matches found",
626            elapsed.as_millis(),
627            matches.len()
628        ))?;
629    }
630
631    Ok(())
632}
633
634// RKG: CODE:SQRY-CLI implements REQ:SQRY-RUBY-QUALIFIED-CALLERS (MEDIUM-2 fix)
635fn run_query_with_session(
636    cli: &Cli,
637    streams: &mut OutputStreams,
638    query_string: &str,
639    search_path: &str,
640    verbose: bool,
641    _no_parallel: bool,
642    relation_ctx: &RelationDisplayContext,
643) -> Result<()> {
644    if cli.text {
645        bail!("--session is only available for semantic queries (remove --text)");
646    }
647
648    let search_path_path = Path::new(search_path);
649
650    // Index ancestor discovery for session mode
651    let (workspace, relative_scope, is_file_query, is_ancestor) =
652        resolve_session_index(search_path_path)?;
653
654    // Build index diagnostic info (for ancestor index or file queries)
655    let index_info = if is_ancestor || relative_scope.is_some() {
656        // Build filtered_to with proper format (file vs directory)
657        let filtered_to = relative_scope.as_ref().map(|p| {
658            if is_file_query {
659                p.to_string_lossy().into_owned()
660            } else {
661                format!("{}/**", p.display())
662            }
663        });
664        IndexDiagnosticInfo {
665            index_root: Some(workspace.clone()),
666            filtered_to,
667            used_ancestor_index: is_ancestor,
668        }
669    } else {
670        IndexDiagnosticInfo::default()
671    };
672
673    // Augment query with scope filter if using ancestor index
674    let effective_query: std::borrow::Cow<'_, str> = if let Some(ref scope) = relative_scope {
675        std::borrow::Cow::Owned(augment_query_with_scope(query_string, scope, is_file_query))
676    } else {
677        std::borrow::Cow::Borrowed(query_string)
678    };
679
680    // Check session cache first before expensive validation
681    // (fixes CODEX MEDIUM-2: avoid validation on warm queries)
682    let mut guard = QUERY_SESSION
683        .lock()
684        .expect("global session cache mutex poisoned");
685
686    if guard.is_none() {
687        // Cold start: create session (graph will be loaded on first query)
688        let config = sqry_core::session::SessionConfig::default();
689        *guard = Some(
690            SessionManager::with_config(config).context("failed to initialise session manager")?,
691        );
692    }
693
694    let session = guard.as_ref().expect("session manager must be initialised");
695    let before = session.stats();
696    let start = Instant::now();
697    let query_results = session
698        .query(&workspace, &effective_query)
699        .with_context(|| format!("failed to execute query \"{}\"", &effective_query))?;
700    let elapsed = start.elapsed();
701    let after = session.stats();
702    let cache_hit = after.cache_hits > before.cache_hits;
703
704    let mut symbols = query_results_to_display_symbols(&query_results);
705
706    let stats = SimpleQueryStats { used_index: true };
707
708    let diagnostics = QueryDiagnostics::Session {
709        cache_hit,
710        stats: after,
711        index_info,
712    };
713    render_semantic_results(
714        cli,
715        streams,
716        &effective_query,
717        &mut symbols,
718        &stats,
719        elapsed,
720        verbose,
721        None,
722        &diagnostics,
723        relation_ctx,
724    )
725}
726
727/// Resolve index location for session mode, walking up directory tree if needed.
728///
729/// Returns `(index_root, relative_scope, is_file_query, is_ancestor)` for query augmentation.
730/// For session mode, file paths are not supported (must be directory).
731fn resolve_session_index(path: &Path) -> Result<(PathBuf, Option<PathBuf>, bool, bool)> {
732    if !path.exists() {
733        bail!(
734            "session mode requires a directory ({} does not exist)",
735            path.display()
736        );
737    }
738
739    // Session mode requires a directory, not a file
740    if path.is_file() {
741        bail!(
742            "session mode requires a directory path ({} is a file). \
743             For file-specific queries, omit --session.",
744            path.display()
745        );
746    }
747
748    // Use index discovery to find nearest .sqry-index
749    if let Some(loc) = find_nearest_index(path) {
750        let relative_scope = if loc.requires_scope_filter {
751            loc.relative_scope()
752        } else {
753            None
754        };
755        Ok((
756            loc.index_root,
757            relative_scope,
758            loc.is_file_query,
759            loc.is_ancestor,
760        ))
761    } else {
762        bail!(
763            "no index found at {} or any parent directory. \
764             Run `sqry index <root>` first.",
765            path.display()
766        );
767    }
768}
769
770fn ensure_repo_predicate_not_present(query_string: &str) -> Result<()> {
771    if let Ok(query) = QueryParser::parse_query(query_string) {
772        if expr_has_repo_predicate(&query.root) {
773            bail!(
774                "repo: filters are only supported via `sqry workspace query` (multi-repo command)"
775            );
776        }
777        return Ok(());
778    }
779
780    if query_string.contains("repo:") {
781        bail!("repo: filters are only supported via `sqry workspace query` (multi-repo command)");
782    }
783
784    Ok(())
785}
786
787fn expr_has_repo_predicate(expr: &Expr) -> bool {
788    match expr {
789        Expr::And(operands) | Expr::Or(operands) => operands.iter().any(expr_has_repo_predicate),
790        Expr::Not(operand) => expr_has_repo_predicate(operand),
791        Expr::Condition(condition) => condition.field.as_str() == "repo",
792        Expr::Join(join) => {
793            expr_has_repo_predicate(&join.left) || expr_has_repo_predicate(&join.right)
794        }
795    }
796}
797
798/// Info about which index was used and any scope filtering applied.
799#[derive(Default)]
800struct IndexDiagnosticInfo {
801    /// Path to the index root directory (where .sqry-index lives)
802    index_root: Option<PathBuf>,
803    /// Scope filter applied (e.g., "src/**" or "main.rs")
804    filtered_to: Option<String>,
805    /// True if index was found in an ancestor directory
806    used_ancestor_index: bool,
807}
808
809enum QueryDiagnostics {
810    Standard {
811        index_info: IndexDiagnosticInfo,
812    },
813    Session {
814        cache_hit: bool,
815        stats: SessionStats,
816        index_info: IndexDiagnosticInfo,
817    },
818}
819
820struct QueryLimitInfo {
821    total_matches: usize,
822    limit: usize,
823    truncated: bool,
824}
825
826#[allow(clippy::too_many_arguments)]
827fn render_semantic_results(
828    cli: &Cli,
829    streams: &mut OutputStreams,
830    query_string: &str,
831    symbols: &mut Vec<DisplaySymbol>,
832    stats: &SimpleQueryStats,
833    elapsed: Duration,
834    verbose: bool,
835    executor_opt: Option<&QueryExecutor>,
836    diagnostics: &QueryDiagnostics,
837    relation_ctx: &RelationDisplayContext,
838) -> Result<()> {
839    // Optional sorting (opt-in)
840    apply_sorting(cli, symbols);
841
842    // Apply limit if specified (default: 1000 for query command)
843    let limit_info = apply_symbol_limit(symbols, cli.limit.unwrap_or(DEFAULT_QUERY_LIMIT));
844
845    // Extract index info from diagnostics for JSON output
846    let index_info = match diagnostics {
847        QueryDiagnostics::Standard { index_info }
848        | QueryDiagnostics::Session { index_info, .. } => index_info,
849    };
850
851    // Build metadata for structured JSON output
852    let metadata =
853        build_formatter_metadata(query_string, limit_info.total_matches, elapsed, index_info);
854
855    let identity_overrides = build_identity_overrides(cli, symbols, relation_ctx);
856
857    let display_symbols =
858        build_display_symbols_with_identities(symbols, identity_overrides.as_ref());
859
860    // Create formatter based on CLI flags
861    format_semantic_output(cli, streams, &display_symbols, &metadata)?;
862
863    maybe_emit_truncation_notice(cli, &limit_info);
864
865    if cli.json || cli.count {
866        return Ok(());
867    }
868
869    write_query_summary(streams, stats, elapsed, symbols.len(), diagnostics)?;
870
871    if verbose {
872        emit_verbose_cache_stats(streams, stats, executor_opt, diagnostics)?;
873    }
874
875    maybe_emit_debug_cache(cli, streams, executor_opt, stats)?;
876
877    Ok(())
878}
879
880fn apply_sorting(cli: &Cli, symbols: &mut [DisplaySymbol]) {
881    if let Some(sort_field) = cli.sort {
882        crate::commands::sort::sort_symbols(symbols, sort_field);
883    }
884}
885
886fn apply_symbol_limit(symbols: &mut Vec<DisplaySymbol>, limit: usize) -> QueryLimitInfo {
887    let total_matches = symbols.len();
888    let truncated = total_matches > limit;
889    if truncated {
890        symbols.truncate(limit);
891    }
892    QueryLimitInfo {
893        total_matches,
894        limit,
895        truncated,
896    }
897}
898
899fn build_formatter_metadata(
900    query_string: &str,
901    total_matches: usize,
902    elapsed: Duration,
903    index_info: &IndexDiagnosticInfo,
904) -> crate::output::FormatterMetadata {
905    crate::output::FormatterMetadata {
906        pattern: Some(query_string.to_string()),
907        total_matches,
908        execution_time: elapsed,
909        filters: sqry_core::json_response::Filters {
910            kind: None,
911            lang: None,
912            ignore_case: false,
913            exact: false,
914            fuzzy: None,
915        },
916        index_age_seconds: None,
917        // Include scope info when any filtering is applied (ancestor or file query)
918        used_ancestor_index: if index_info.used_ancestor_index || index_info.filtered_to.is_some() {
919            Some(index_info.used_ancestor_index)
920        } else {
921            None
922        },
923        filtered_to: index_info.filtered_to.clone(),
924    }
925}
926
927fn build_identity_overrides(
928    cli: &Cli,
929    symbols: &[DisplaySymbol],
930    relation_ctx: &RelationDisplayContext,
931) -> Option<DisplayIdentities> {
932    if cli.qualified_names || cli.json {
933        Some(compute_display_identities(symbols, relation_ctx))
934    } else {
935        None
936    }
937}
938
939fn format_semantic_output(
940    cli: &Cli,
941    streams: &mut OutputStreams,
942    display_symbols: &[DisplaySymbol],
943    metadata: &crate::output::FormatterMetadata,
944) -> Result<()> {
945    let formatter = create_formatter(cli);
946    formatter.format(display_symbols, Some(metadata), streams)?;
947    Ok(())
948}
949
950fn maybe_emit_truncation_notice(cli: &Cli, limit_info: &QueryLimitInfo) {
951    if !cli.json && limit_info.truncated {
952        eprintln!(
953            "\nShowing {} of {} matches (use --limit to adjust)",
954            limit_info.limit, limit_info.total_matches
955        );
956    }
957}
958
959fn build_display_symbols_with_identities(
960    symbols: &[DisplaySymbol],
961    identity_overrides: Option<&DisplayIdentities>,
962) -> Vec<DisplaySymbol> {
963    match identity_overrides {
964        Some(identities) => symbols
965            .iter()
966            .enumerate()
967            .map(|(idx, symbol)| {
968                let invoker_identity = identities
969                    .invoker_identities
970                    .get(idx)
971                    .and_then(Clone::clone);
972                let target_identity = identities.target_identities.get(idx).and_then(Clone::clone);
973
974                // Use the appropriate constructor based on which identity is present
975                if invoker_identity.is_some() {
976                    symbol.clone().with_caller_identity(invoker_identity)
977                } else if target_identity.is_some() {
978                    symbol.clone().with_callee_identity(target_identity)
979                } else {
980                    symbol.clone()
981                }
982            })
983            .collect(),
984        None => symbols.to_vec(),
985    }
986}
987
988fn write_query_summary(
989    streams: &mut OutputStreams,
990    stats: &SimpleQueryStats,
991    elapsed: Duration,
992    symbol_count: usize,
993    diagnostics: &QueryDiagnostics,
994) -> Result<()> {
995    use std::fmt::Write as _;
996
997    streams.write_diagnostic("")?;
998
999    // Extract index_info from diagnostics
1000    let index_info = match diagnostics {
1001        QueryDiagnostics::Standard { index_info }
1002        | QueryDiagnostics::Session { index_info, .. } => index_info,
1003    };
1004
1005    // Build index status message with ancestor info if applicable
1006    let index_status = if stats.used_index {
1007        if index_info.used_ancestor_index {
1008            if let Some(ref root) = index_info.index_root {
1009                format!("✓ Using index from {}", root.display())
1010            } else {
1011                "✓ Used index".to_string()
1012            }
1013        } else {
1014            "✓ Used index".to_string()
1015        }
1016    } else {
1017        "ℹ No index found".to_string()
1018    };
1019
1020    let mut msg = format!(
1021        "{} - Query executed ({}ms) - {} symbols found",
1022        index_status,
1023        elapsed.as_millis(),
1024        symbol_count
1025    );
1026
1027    // Add scope filter info if applicable (ancestor index or file query)
1028    if let Some(ref filtered_to) = index_info.filtered_to {
1029        let _ = write!(msg, " (filtered to {filtered_to})");
1030    }
1031
1032    if let QueryDiagnostics::Session { cache_hit, .. } = diagnostics {
1033        let cache_state = if *cache_hit {
1034            "session cache hit"
1035        } else {
1036            "session cache miss"
1037        };
1038        let _ = write!(msg, " [{cache_state}]");
1039    }
1040
1041    streams.write_diagnostic(&msg)?;
1042
1043    Ok(())
1044}
1045
1046fn emit_verbose_cache_stats(
1047    streams: &mut OutputStreams,
1048    _stats: &SimpleQueryStats,
1049    executor_opt: Option<&QueryExecutor>,
1050    diagnostics: &QueryDiagnostics,
1051) -> Result<()> {
1052    match (executor_opt, diagnostics) {
1053        (Some(executor), _) => emit_executor_cache_stats(streams, executor),
1054        (None, QueryDiagnostics::Session { stats, .. }) => emit_session_cache_stats(streams, stats),
1055        _ => emit_hybrid_cache_notice(streams),
1056    }
1057}
1058
1059fn emit_executor_cache_stats(streams: &mut OutputStreams, executor: &QueryExecutor) -> Result<()> {
1060    let (parse_stats, result_stats) = executor.cache_stats();
1061
1062    streams.write_diagnostic("")?;
1063    streams.write_diagnostic("Cache Statistics:")?;
1064
1065    let parse_msg = format!(
1066        "  Parse cache:  {:.1}% hit rate ({} hits, {} misses, {} evictions)",
1067        parse_stats.hit_rate() * 100.0,
1068        parse_stats.hits,
1069        parse_stats.misses,
1070        parse_stats.evictions,
1071    );
1072    streams.write_diagnostic(&parse_msg)?;
1073
1074    let result_msg = format!(
1075        "  Result cache: {:.1}% hit rate ({} hits, {} misses, {} evictions)",
1076        result_stats.hit_rate() * 100.0,
1077        result_stats.hits,
1078        result_stats.misses,
1079        result_stats.evictions,
1080    );
1081    streams.write_diagnostic(&result_msg)?;
1082
1083    Ok(())
1084}
1085
1086fn emit_session_cache_stats(streams: &mut OutputStreams, stats: &SessionStats) -> Result<()> {
1087    let total_cache_events = stats.cache_hits + stats.cache_misses;
1088    let hit_rate = if total_cache_events > 0 {
1089        (u64_to_f64_lossy(stats.cache_hits) / u64_to_f64_lossy(total_cache_events)) * 100.0
1090    } else {
1091        0.0
1092    };
1093
1094    streams.write_diagnostic("")?;
1095    streams.write_diagnostic("Session statistics:")?;
1096    let _ = streams.write_diagnostic(&format!("  Cached indexes : {}", stats.cached_graphs));
1097    let _ = streams.write_diagnostic(&format!("  Total queries  : {}", stats.total_queries));
1098    let _ = streams.write_diagnostic(&format!(
1099        "  Cache hits     : {} ({hit_rate:.1}% hit rate)",
1100        stats.cache_hits
1101    ));
1102    let _ = streams.write_diagnostic(&format!("  Cache misses   : {}", stats.cache_misses));
1103    let _ = streams.write_diagnostic(&format!(
1104        "  Estimated memory: ~{} MB",
1105        stats.total_memory_mb
1106    ));
1107
1108    Ok(())
1109}
1110
1111fn emit_hybrid_cache_notice(streams: &mut OutputStreams) -> Result<()> {
1112    streams.write_diagnostic("")?;
1113    streams.write_diagnostic("Cache statistics not available in hybrid search mode")?;
1114    Ok(())
1115}
1116
1117struct DisplayIdentities {
1118    invoker_identities: Vec<Option<CallIdentityMetadata>>,
1119    target_identities: Vec<Option<CallIdentityMetadata>>,
1120}
1121
1122fn compute_display_identities(
1123    symbols: &[DisplaySymbol],
1124    relation_ctx: &RelationDisplayContext,
1125) -> DisplayIdentities {
1126    // Build identity metadata from symbol qualified names for relation queries.
1127    // For callers: queries, each result is a caller and gets caller_identity.
1128    // For callees: queries, each result is a callee and gets callee_identity.
1129    let has_incoming_targets = !relation_ctx.caller_targets.is_empty();
1130    let has_outgoing_targets = !relation_ctx.callee_targets.is_empty();
1131
1132    let identities: Vec<Option<CallIdentityMetadata>> = symbols
1133        .iter()
1134        .map(build_identity_from_display_symbol)
1135        .collect();
1136
1137    if has_incoming_targets {
1138        DisplayIdentities {
1139            invoker_identities: identities,
1140            target_identities: vec![None; symbols.len()],
1141        }
1142    } else if has_outgoing_targets {
1143        DisplayIdentities {
1144            invoker_identities: vec![None; symbols.len()],
1145            target_identities: identities,
1146        }
1147    } else {
1148        DisplayIdentities {
1149            invoker_identities: vec![None; symbols.len()],
1150            target_identities: vec![None; symbols.len()],
1151        }
1152    }
1153}
1154
1155fn build_identity_from_display_symbol(symbol: &DisplaySymbol) -> Option<CallIdentityMetadata> {
1156    let language = symbol.metadata.get("__raw_language").map(String::as_str);
1157    let is_static = symbol
1158        .metadata
1159        .get("static")
1160        .is_some_and(|value| value == "true");
1161
1162    build_identity_from_qualified_name(&symbol.qualified_name, &symbol.kind, language, is_static)
1163}
1164fn build_identity_from_qualified_name(
1165    qualified: &str,
1166    kind: &str,
1167    language: Option<&str>,
1168    is_static: bool,
1169) -> Option<CallIdentityMetadata> {
1170    call_identity_from_qualified_name(qualified, kind, language, is_static)
1171}
1172
1173/// Format execution steps for display
1174fn format_execution_steps(steps: &[sqry_core::query::ExecutionStep]) -> String {
1175    steps
1176        .iter()
1177        .map(|step| {
1178            format!(
1179                "  {}. {} ({}ms)",
1180                step.step_num, step.operation, step.time_ms
1181            )
1182        })
1183        .collect::<Vec<_>>()
1184        .join("\n")
1185}
1186
1187/// Format cache status for display
1188fn format_cache_status(status: &sqry_core::query::CacheStatus) -> String {
1189    match (status.parse_cache_hit, status.result_cache_hit) {
1190        (true, true) => "HIT (100% cached)".to_string(),
1191        (true, false) => "PARTIAL HIT (query cached, results computed)".to_string(),
1192        (false, true) => "PARTIAL HIT (query parsed, results cached)".to_string(),
1193        (false, false) => "MISS (first run)".to_string(),
1194    }
1195}
1196
1197fn env_debug_cache_enabled() -> bool {
1198    matches!(
1199        env::var("SQRY_CACHE_DEBUG"),
1200        Ok(value) if value == "1" || value.eq_ignore_ascii_case("true")
1201    )
1202}
1203
1204#[derive(Default)]
1205struct RelationDisplayContext {
1206    caller_targets: Vec<String>,
1207    callee_targets: Vec<String>,
1208}
1209
1210impl RelationDisplayContext {
1211    fn from_query(query_str: &str) -> Self {
1212        match QueryParser::parse_query(query_str) {
1213            Ok(ast) => {
1214                let mut ctx = Self::default();
1215                collect_relation_targets(&ast.root, &mut ctx);
1216                ctx
1217            }
1218            Err(_) => Self::default(),
1219        }
1220    }
1221}
1222
1223fn collect_relation_targets(expr: &Expr, ctx: &mut RelationDisplayContext) {
1224    match expr {
1225        Expr::And(operands) | Expr::Or(operands) => {
1226            for operand in operands {
1227                collect_relation_targets(operand, ctx);
1228            }
1229        }
1230        Expr::Not(inner) => collect_relation_targets(inner, ctx),
1231        Expr::Join(join) => {
1232            collect_relation_targets(&join.left, ctx);
1233            collect_relation_targets(&join.right, ctx);
1234        }
1235        Expr::Condition(condition) => match condition.field.as_str() {
1236            "callers" => {
1237                if let Value::String(value) = &condition.value
1238                    && !value.is_empty()
1239                {
1240                    ctx.caller_targets.push(value.clone());
1241                }
1242            }
1243            "callees" => {
1244                if let Value::String(value) = &condition.value
1245                    && !value.is_empty()
1246                {
1247                    ctx.callee_targets.push(value.clone());
1248                }
1249            }
1250            _ => {}
1251        },
1252    }
1253}
1254
1255fn should_debug_cache(cli: &Cli) -> bool {
1256    cli.debug_cache || env_debug_cache_enabled()
1257}
1258
1259// RKG: CODE:SQRY-CLI implements REQ:SQRY-P2-6-CACHE-EVICTION-POLICY
1260fn maybe_emit_debug_cache(
1261    cli: &Cli,
1262    streams: &mut OutputStreams,
1263    executor_opt: Option<&QueryExecutor>,
1264    _stats: &SimpleQueryStats,
1265) -> Result<()> {
1266    if !should_debug_cache(cli) {
1267        return Ok(());
1268    }
1269
1270    let Some(executor) = executor_opt else {
1271        streams.write_diagnostic("CacheStats unavailable in this mode")?;
1272        return Ok(());
1273    };
1274
1275    let (parse_stats, result_stats) = executor.cache_stats();
1276
1277    let debug_line = format!(
1278        "CacheStats{{parse_hits={}, parse_misses={}, result_hits={}, result_misses={}}}",
1279        parse_stats.hits, parse_stats.misses, result_stats.hits, result_stats.misses,
1280    );
1281    streams.write_diagnostic(&debug_line)?;
1282    Ok(())
1283}
1284
1285/// Build hybrid search configuration from CLI flags
1286fn build_hybrid_config(cli: &Cli) -> FallbackConfig {
1287    let mut config = FallbackConfig::from_env();
1288
1289    // Override with CLI flags
1290    if cli.no_fallback {
1291        config.fallback_enabled = false;
1292    }
1293
1294    config.text_context_lines = cli.context;
1295    config.max_text_results = cli.max_text_results;
1296
1297    // Disable search mode output in JSON mode
1298    if cli.json {
1299        config.show_search_mode = false;
1300    }
1301
1302    config
1303}
1304
1305/// Determine if hybrid search should be used based on CLI flags
1306fn should_use_hybrid_search(cli: &Cli) -> bool {
1307    // Cache debugging requires direct access to QueryExecutor stats.
1308    if should_debug_cache(cli) {
1309        return false;
1310    }
1311
1312    // Always use hybrid search (it handles --text, --semantic, and hybrid modes)
1313    // The only reason NOT to use it would be if hybrid search is explicitly disabled
1314    // via environment variable or if we need old behavior for compatibility
1315    true
1316}
1317
1318/// Create a `QueryExecutor` with all built-in plugins registered
1319pub(crate) fn create_executor_with_plugins() -> QueryExecutor {
1320    let plugin_manager = crate::plugin_defaults::create_plugin_manager();
1321    QueryExecutor::with_plugin_manager(plugin_manager)
1322}
1323
1324pub(crate) fn create_executor_with_plugins_for_cli(
1325    cli: &Cli,
1326    search_path: &Path,
1327) -> Result<QueryExecutor> {
1328    let effective_root = find_nearest_index(search_path)
1329        .map_or_else(|| search_path.to_path_buf(), |location| location.index_root);
1330    let resolved_plugins = plugin_defaults::resolve_plugin_selection(
1331        cli,
1332        &effective_root,
1333        PluginSelectionMode::ReadOnly,
1334    )?;
1335    Ok(QueryExecutor::with_plugin_manager(
1336        resolved_plugins.plugin_manager,
1337    ))
1338}
1339
1340fn u64_to_f64_lossy(value: u64) -> f64 {
1341    let narrowed = u32::try_from(value).unwrap_or(u32::MAX);
1342    f64::from(narrowed)
1343}
1344
1345// ============================================================================
1346// Variable, Join, and Pipeline support
1347// ============================================================================
1348
1349/// Parse `--var KEY=VALUE` arguments into a `HashMap`.
1350fn parse_variable_args(args: &[String]) -> Result<std::collections::HashMap<String, String>> {
1351    let mut map = std::collections::HashMap::new();
1352    for arg in args {
1353        let (key, value) = arg
1354            .split_once('=')
1355            .ok_or_else(|| anyhow::anyhow!("Invalid --var format: '{arg}'. Expected KEY=VALUE"))?;
1356        if key.is_empty() {
1357            bail!("Variable name cannot be empty in --var '{arg}'");
1358        }
1359        map.insert(key.to_string(), value.to_string());
1360    }
1361    Ok(map)
1362}
1363
1364/// Check if a query string contains a join expression at the root level.
1365///
1366/// Returns `false` on parse errors (the normal flow will handle the error).
1367fn is_join_query(query_str: &str) -> bool {
1368    match QueryParser::parse_query(query_str) {
1369        Ok(ast) => matches!(ast.root, Expr::Join(_)),
1370        Err(_) => false,
1371    }
1372}
1373
1374/// Detect a pipeline query (base query | aggregation stages).
1375///
1376/// If the query string contains a `|` character, pipeline parse errors are
1377/// treated as hard errors (the user intended a pipeline query). If no `|`
1378/// is present, returns `None` (not a pipeline query).
1379fn detect_pipeline_query(
1380    query_str: &str,
1381) -> Result<Option<sqry_core::query::types::PipelineQuery>> {
1382    match QueryParser::parse_pipeline_query(query_str) {
1383        Ok(result) => Ok(result),
1384        Err(e) => {
1385            // If the query contains a pipe, the user intended a pipeline query
1386            // and the parse error should be surfaced (not silently ignored).
1387            if query_str.contains('|') {
1388                Err(anyhow::anyhow!("Pipeline parse error: {e}"))
1389            } else {
1390                Ok(None)
1391            }
1392        }
1393    }
1394}
1395
1396/// Run a join query and render results.
1397fn run_join_query(
1398    cli: &Cli,
1399    streams: &mut OutputStreams,
1400    query_string: &str,
1401    search_path: &str,
1402    no_parallel: bool,
1403    variables: Option<&std::collections::HashMap<String, String>>,
1404) -> Result<()> {
1405    let validation_options = build_validation_options(cli);
1406    let mut executor = create_executor_with_plugins_for_cli(cli, Path::new(search_path))?
1407        .with_validation_options(validation_options);
1408    if no_parallel {
1409        executor = executor.without_parallel();
1410    }
1411
1412    let resolved_path = Path::new(search_path);
1413    let join_results = executor.execute_join(query_string, resolved_path, variables)?;
1414
1415    if join_results.truncated() {
1416        streams.write_diagnostic(&format!(
1417            "Join query: {} pairs matched via {} (results truncated — cap reached)",
1418            join_results.len(),
1419            join_results.edge_kind()
1420        ))?;
1421    } else {
1422        streams.write_diagnostic(&format!(
1423            "Join query: {} pairs matched via {}",
1424            join_results.len(),
1425            join_results.edge_kind()
1426        ))?;
1427    }
1428
1429    for pair in join_results.iter() {
1430        let left_name = pair.left.name().unwrap_or_default();
1431        let left_path = pair
1432            .left
1433            .relative_path()
1434            .map_or_else(|| "<unknown>".to_string(), |p| p.display().to_string());
1435        let right_name = pair.right.name().unwrap_or_default();
1436        let right_path = pair
1437            .right
1438            .relative_path()
1439            .map_or_else(|| "<unknown>".to_string(), |p| p.display().to_string());
1440
1441        if cli.json {
1442            // JSON mode: each pair as a JSON object
1443            let json = serde_json::json!({
1444                "left": {
1445                    "name": left_name.as_ref(),
1446                    "kind": pair.left.kind().as_str(),
1447                    "path": left_path,
1448                    "line": pair.left.start_line(),
1449                },
1450                "edge": pair.edge_kind.to_string(),
1451                "right": {
1452                    "name": right_name.as_ref(),
1453                    "kind": pair.right.kind().as_str(),
1454                    "path": right_path,
1455                    "line": pair.right.start_line(),
1456                },
1457            });
1458            streams.write_result(&json.to_string())?;
1459        } else {
1460            streams.write_result(&format!(
1461                "{} ({}:{}) {} {} ({}:{})",
1462                left_name,
1463                left_path,
1464                pair.left.start_line(),
1465                pair.edge_kind,
1466                right_name,
1467                right_path,
1468                pair.right.start_line(),
1469            ))?;
1470        }
1471    }
1472
1473    Ok(())
1474}
1475
1476/// Run a pipeline query (base query + aggregation stages) and render results.
1477fn run_pipeline_query(
1478    cli: &Cli,
1479    streams: &mut OutputStreams,
1480    _query_string: &str,
1481    search_path: &str,
1482    pipeline: &sqry_core::query::types::PipelineQuery,
1483    no_parallel: bool,
1484    variables: Option<&std::collections::HashMap<String, String>>,
1485) -> Result<()> {
1486    let validation_options = build_validation_options(cli);
1487    let mut executor = create_executor_with_plugins_for_cli(cli, Path::new(search_path))?
1488        .with_validation_options(validation_options);
1489    if no_parallel {
1490        executor = executor.without_parallel();
1491    }
1492
1493    let resolved_path = Path::new(search_path);
1494
1495    // Execute the base query portion (before the pipe)
1496    // Serialize the base query from the parsed AST for reliable reconstruction
1497    let base_query = sqry_core::query::parsed_query::serialize_query(&pipeline.query);
1498
1499    let results =
1500        executor.execute_on_graph_with_variables(&base_query, resolved_path, variables)?;
1501
1502    // Execute each pipeline stage
1503    for stage in &pipeline.stages {
1504        let aggregation = sqry_core::query::execute_pipeline_stage(&results, stage);
1505
1506        if cli.json {
1507            render_aggregation_json(streams, &aggregation)?;
1508        } else {
1509            streams.write_result(&format!("{aggregation}"))?;
1510        }
1511    }
1512
1513    Ok(())
1514}
1515
1516/// Render aggregation results as JSON.
1517fn render_aggregation_json(
1518    streams: &mut OutputStreams,
1519    aggregation: &sqry_core::query::pipeline::AggregationResult,
1520) -> Result<()> {
1521    use sqry_core::query::pipeline::AggregationResult;
1522    let json = match aggregation {
1523        AggregationResult::Count(r) => serde_json::json!({
1524            "type": "count",
1525            "total": r.total,
1526        }),
1527        AggregationResult::GroupBy(r) => serde_json::json!({
1528            "type": "group_by",
1529            "field": r.field,
1530            "groups": r.groups.iter().map(|(k, v)| serde_json::json!({"value": k, "count": v})).collect::<Vec<_>>(),
1531        }),
1532        AggregationResult::Top(r) => serde_json::json!({
1533            "type": "top",
1534            "field": r.field,
1535            "n": r.n,
1536            "entries": r.entries.iter().map(|(k, v)| serde_json::json!({"value": k, "count": v})).collect::<Vec<_>>(),
1537        }),
1538        AggregationResult::Stats(r) => serde_json::json!({
1539            "type": "stats",
1540            "total": r.total,
1541            "by_kind": r.by_kind.iter().map(|(k, v)| serde_json::json!({"value": k, "count": v})).collect::<Vec<_>>(),
1542            "by_lang": r.by_lang.iter().map(|(k, v)| serde_json::json!({"value": k, "count": v})).collect::<Vec<_>>(),
1543            "by_visibility": r.by_visibility.iter().map(|(k, v)| serde_json::json!({"value": k, "count": v})).collect::<Vec<_>>(),
1544        }),
1545    };
1546    streams.write_result(&json.to_string())?;
1547    Ok(())
1548}
1549
1550#[cfg(test)]
1551mod tests {
1552    use super::*;
1553    use sqry_core::relations::CallIdentityKind;
1554
1555    // ==========================================================================
1556    // u64_to_f64_lossy tests
1557    // ==========================================================================
1558
1559    #[test]
1560    fn test_u64_to_f64_lossy_zero() {
1561        assert!((u64_to_f64_lossy(0) - 0.0).abs() < f64::EPSILON);
1562    }
1563
1564    #[test]
1565    fn test_u64_to_f64_lossy_small_values() {
1566        assert!((u64_to_f64_lossy(1) - 1.0).abs() < f64::EPSILON);
1567        assert!((u64_to_f64_lossy(100) - 100.0).abs() < f64::EPSILON);
1568        assert!((u64_to_f64_lossy(1000) - 1000.0).abs() < f64::EPSILON);
1569    }
1570
1571    #[test]
1572    fn test_u64_to_f64_lossy_u32_max() {
1573        let u32_max = u64::from(u32::MAX);
1574        assert!((u64_to_f64_lossy(u32_max) - f64::from(u32::MAX)).abs() < f64::EPSILON);
1575    }
1576
1577    #[test]
1578    fn test_u64_to_f64_lossy_overflow_clamps_to_u32_max() {
1579        // Values larger than u32::MAX should clamp
1580        let large_value = u64::from(u32::MAX) + 1;
1581        assert!((u64_to_f64_lossy(large_value) - f64::from(u32::MAX)).abs() < f64::EPSILON);
1582    }
1583
1584    // ==========================================================================
1585    // format_cache_status tests
1586    // ==========================================================================
1587
1588    #[test]
1589    fn test_format_cache_status_full_hit() {
1590        let status = sqry_core::query::CacheStatus {
1591            parse_cache_hit: true,
1592            result_cache_hit: true,
1593        };
1594        assert_eq!(format_cache_status(&status), "HIT (100% cached)");
1595    }
1596
1597    #[test]
1598    fn test_format_cache_status_parse_hit_only() {
1599        let status = sqry_core::query::CacheStatus {
1600            parse_cache_hit: true,
1601            result_cache_hit: false,
1602        };
1603        assert_eq!(
1604            format_cache_status(&status),
1605            "PARTIAL HIT (query cached, results computed)"
1606        );
1607    }
1608
1609    #[test]
1610    fn test_format_cache_status_result_hit_only() {
1611        let status = sqry_core::query::CacheStatus {
1612            parse_cache_hit: false,
1613            result_cache_hit: true,
1614        };
1615        assert_eq!(
1616            format_cache_status(&status),
1617            "PARTIAL HIT (query parsed, results cached)"
1618        );
1619    }
1620
1621    #[test]
1622    fn test_format_cache_status_full_miss() {
1623        let status = sqry_core::query::CacheStatus {
1624            parse_cache_hit: false,
1625            result_cache_hit: false,
1626        };
1627        assert_eq!(format_cache_status(&status), "MISS (first run)");
1628    }
1629
1630    // ==========================================================================
1631    // format_execution_steps tests
1632    // ==========================================================================
1633
1634    #[test]
1635    fn test_format_execution_steps_empty() {
1636        let steps: Vec<sqry_core::query::ExecutionStep> = vec![];
1637        assert_eq!(format_execution_steps(&steps), "");
1638    }
1639
1640    #[test]
1641    fn test_format_execution_steps_single() {
1642        let steps = vec![sqry_core::query::ExecutionStep {
1643            step_num: 1,
1644            operation: "Parse query".to_string(),
1645            result_count: 0,
1646            time_ms: 5,
1647        }];
1648        assert_eq!(format_execution_steps(&steps), "  1. Parse query (5ms)");
1649    }
1650
1651    #[test]
1652    fn test_format_execution_steps_multiple() {
1653        let steps = vec![
1654            sqry_core::query::ExecutionStep {
1655                step_num: 1,
1656                operation: "Parse".to_string(),
1657                result_count: 100,
1658                time_ms: 2,
1659            },
1660            sqry_core::query::ExecutionStep {
1661                step_num: 2,
1662                operation: "Optimize".to_string(),
1663                result_count: 50,
1664                time_ms: 3,
1665            },
1666            sqry_core::query::ExecutionStep {
1667                step_num: 3,
1668                operation: "Execute".to_string(),
1669                result_count: 25,
1670                time_ms: 10,
1671            },
1672        ];
1673        let expected = "  1. Parse (2ms)\n  2. Optimize (3ms)\n  3. Execute (10ms)";
1674        assert_eq!(format_execution_steps(&steps), expected);
1675    }
1676
1677    // ==========================================================================
1678    // expr_has_repo_predicate tests
1679    // ==========================================================================
1680
1681    #[test]
1682    fn test_expr_has_repo_predicate_simple_repo() {
1683        let query = QueryParser::parse_query("repo:myrepo").unwrap();
1684        assert!(expr_has_repo_predicate(&query.root));
1685    }
1686
1687    #[test]
1688    fn test_expr_has_repo_predicate_no_repo() {
1689        let query = QueryParser::parse_query("kind:function").unwrap();
1690        assert!(!expr_has_repo_predicate(&query.root));
1691    }
1692
1693    #[test]
1694    fn test_expr_has_repo_predicate_nested_and() {
1695        let query = QueryParser::parse_query("kind:function AND repo:myrepo").unwrap();
1696        assert!(expr_has_repo_predicate(&query.root));
1697    }
1698
1699    #[test]
1700    fn test_expr_has_repo_predicate_nested_or() {
1701        let query = QueryParser::parse_query("kind:function OR repo:myrepo").unwrap();
1702        assert!(expr_has_repo_predicate(&query.root));
1703    }
1704
1705    #[test]
1706    fn test_expr_has_repo_predicate_nested_not() {
1707        let query = QueryParser::parse_query("NOT repo:myrepo").unwrap();
1708        assert!(expr_has_repo_predicate(&query.root));
1709    }
1710
1711    #[test]
1712    fn test_expr_has_repo_predicate_complex_no_repo() {
1713        let query = QueryParser::parse_query("kind:function AND name:foo OR lang:rust").unwrap();
1714        assert!(!expr_has_repo_predicate(&query.root));
1715    }
1716
1717    // ==========================================================================
1718    // RelationDisplayContext tests
1719    // ==========================================================================
1720
1721    #[test]
1722    fn test_relation_context_no_relations() {
1723        let ctx = RelationDisplayContext::from_query("kind:function");
1724        assert!(ctx.caller_targets.is_empty());
1725        assert!(ctx.callee_targets.is_empty());
1726    }
1727
1728    #[test]
1729    fn test_relation_context_with_callers() {
1730        let ctx = RelationDisplayContext::from_query("callers:foo");
1731        assert_eq!(ctx.caller_targets, vec!["foo"]);
1732        assert!(ctx.callee_targets.is_empty());
1733    }
1734
1735    #[test]
1736    fn test_relation_context_with_callees() {
1737        let ctx = RelationDisplayContext::from_query("callees:bar");
1738        assert!(ctx.caller_targets.is_empty());
1739        assert_eq!(ctx.callee_targets, vec!["bar"]);
1740    }
1741
1742    #[test]
1743    fn test_relation_context_with_both() {
1744        let ctx = RelationDisplayContext::from_query("callers:foo AND callees:bar");
1745        assert_eq!(ctx.caller_targets, vec!["foo"]);
1746        assert_eq!(ctx.callee_targets, vec!["bar"]);
1747    }
1748
1749    #[test]
1750    fn test_relation_context_invalid_query() {
1751        // Invalid queries should return default context
1752        let ctx = RelationDisplayContext::from_query("invalid query syntax ???");
1753        assert!(ctx.caller_targets.is_empty());
1754        assert!(ctx.callee_targets.is_empty());
1755    }
1756
1757    #[test]
1758    fn test_build_identity_from_qualified_name_preserves_ruby_instance_display() {
1759        let identity = build_identity_from_qualified_name(
1760            "Admin::Users::Controller::show",
1761            "method",
1762            Some("ruby"),
1763            false,
1764        )
1765        .expect("ruby instance identity");
1766
1767        assert_eq!(identity.qualified, "Admin::Users::Controller#show");
1768        assert_eq!(identity.method_kind, CallIdentityKind::Instance);
1769    }
1770
1771    #[test]
1772    fn test_build_identity_from_qualified_name_preserves_ruby_singleton_display() {
1773        let identity = build_identity_from_qualified_name(
1774            "Admin::Users::Controller::show",
1775            "method",
1776            Some("ruby"),
1777            true,
1778        )
1779        .expect("ruby singleton identity");
1780
1781        assert_eq!(identity.qualified, "Admin::Users::Controller.show");
1782        assert_eq!(identity.method_kind, CallIdentityKind::Singleton);
1783    }
1784
1785    // ==========================================================================
1786    // ensure_repo_predicate_not_present tests
1787    // ==========================================================================
1788
1789    #[test]
1790    fn test_ensure_repo_not_present_ok() {
1791        let result = ensure_repo_predicate_not_present("kind:function");
1792        assert!(result.is_ok());
1793    }
1794
1795    #[test]
1796    fn test_ensure_repo_not_present_fails_with_repo() {
1797        let result = ensure_repo_predicate_not_present("repo:myrepo");
1798        assert!(result.is_err());
1799        assert!(
1800            result
1801                .unwrap_err()
1802                .to_string()
1803                .contains("repo: filters are only supported")
1804        );
1805    }
1806
1807    #[test]
1808    fn test_ensure_repo_not_present_fails_with_nested_repo() {
1809        let result = ensure_repo_predicate_not_present("kind:function AND repo:myrepo");
1810        assert!(result.is_err());
1811    }
1812
1813    #[test]
1814    fn test_ensure_repo_not_present_fallback_text_check() {
1815        // Even if query doesn't parse, text-based check should work
1816        let result = ensure_repo_predicate_not_present("invalid??? repo:something");
1817        assert!(result.is_err());
1818    }
1819
1820    // ==========================================================================
1821    // parse_variable_args tests
1822    // ==========================================================================
1823
1824    #[test]
1825    fn test_parse_variable_args_empty() {
1826        let result = parse_variable_args(&[]).unwrap();
1827        assert!(result.is_empty());
1828    }
1829
1830    #[test]
1831    fn test_parse_variable_args_single_key_value() {
1832        let args = vec!["FOO=bar".to_string()];
1833        let result = parse_variable_args(&args).unwrap();
1834        assert_eq!(result.len(), 1);
1835        assert_eq!(result.get("FOO"), Some(&"bar".to_string()));
1836    }
1837
1838    #[test]
1839    fn test_parse_variable_args_multiple() {
1840        let args = vec!["A=1".to_string(), "B=hello world".to_string()];
1841        let result = parse_variable_args(&args).unwrap();
1842        assert_eq!(result.len(), 2);
1843        assert_eq!(result.get("A"), Some(&"1".to_string()));
1844        assert_eq!(result.get("B"), Some(&"hello world".to_string()));
1845    }
1846
1847    #[test]
1848    fn test_parse_variable_args_value_with_equals() {
1849        // Only the first '=' is the separator; rest is the value
1850        let args = vec!["KEY=val=ue".to_string()];
1851        let result = parse_variable_args(&args).unwrap();
1852        assert_eq!(result.get("KEY"), Some(&"val=ue".to_string()));
1853    }
1854
1855    #[test]
1856    fn test_parse_variable_args_no_equals_errors() {
1857        let args = vec!["NOEQUALS".to_string()];
1858        let err = parse_variable_args(&args).unwrap_err();
1859        assert!(
1860            err.to_string().contains("Invalid --var format"),
1861            "Unexpected error: {err}"
1862        );
1863    }
1864
1865    #[test]
1866    fn test_parse_variable_args_empty_key_errors() {
1867        let args = vec!["=value".to_string()];
1868        let err = parse_variable_args(&args).unwrap_err();
1869        assert!(
1870            err.to_string().contains("Variable name cannot be empty"),
1871            "Unexpected error: {err}"
1872        );
1873    }
1874
1875    #[test]
1876    fn test_parse_variable_args_empty_value_allowed() {
1877        let args = vec!["KEY=".to_string()];
1878        let result = parse_variable_args(&args).unwrap();
1879        assert_eq!(result.get("KEY"), Some(&String::new()));
1880    }
1881
1882    // ==========================================================================
1883    // is_join_query tests
1884    // ==========================================================================
1885
1886    #[test]
1887    fn test_is_join_query_non_join() {
1888        assert!(!is_join_query("kind:function"));
1889        assert!(!is_join_query("name:foo AND kind:method"));
1890    }
1891
1892    #[test]
1893    fn test_is_join_query_invalid_query_returns_false() {
1894        // parse errors → false, not panic
1895        assert!(!is_join_query("invalid ??? syntax {{{"));
1896    }
1897
1898    #[test]
1899    fn test_is_join_query_positive() {
1900        // A valid join expression uses the CALLS operator between two sub-queries.
1901        // The parser recognises `(lhs) CALLS (rhs)` as a Join expression.
1902        assert!(
1903            is_join_query("(kind:function) CALLS (kind:function)"),
1904            "CALLS join expression must be detected as a join query"
1905        );
1906    }
1907
1908    // ==========================================================================
1909    // detect_pipeline_query tests
1910    // ==========================================================================
1911
1912    #[test]
1913    fn test_detect_pipeline_query_no_pipe_returns_none() {
1914        let result = detect_pipeline_query("kind:function").unwrap();
1915        assert!(result.is_none());
1916    }
1917
1918    #[test]
1919    fn test_detect_pipeline_query_invalid_without_pipe_returns_none() {
1920        // No pipe → even parse errors silently return None
1921        let result = detect_pipeline_query("invalid query !!!").unwrap();
1922        assert!(result.is_none());
1923    }
1924
1925    #[test]
1926    fn test_detect_pipeline_query_invalid_with_pipe_errors() {
1927        // A well-formed pipeline query (base `|` valid stage) must return Ok.
1928        // The presence of `|` only turns parse *errors* into hard errors; a
1929        // successful parse must always return Ok(Some(_)).
1930        let result = detect_pipeline_query("kind:function | count");
1931        assert!(
1932            result.is_ok(),
1933            "A valid pipeline query must return Ok, got: {result:?}"
1934        );
1935        assert!(
1936            result.unwrap().is_some(),
1937            "A valid pipeline query must return Ok(Some(_))"
1938        );
1939    }
1940
1941    // ==========================================================================
1942    // apply_symbol_limit tests
1943    // ==========================================================================
1944
1945    #[test]
1946    fn test_apply_symbol_limit_no_truncation() {
1947        let mut symbols: Vec<DisplaySymbol> = (0..5)
1948            .map(|i| DisplaySymbol {
1949                name: format!("sym{i}"),
1950                qualified_name: format!("sym{i}"),
1951                kind: "function".to_string(),
1952                file_path: std::path::PathBuf::from("a.rs"),
1953                start_line: i,
1954                start_column: 0,
1955                end_line: i,
1956                end_column: 0,
1957                metadata: std::collections::HashMap::new(),
1958                caller_identity: None,
1959                callee_identity: None,
1960            })
1961            .collect();
1962
1963        let info = apply_symbol_limit(&mut symbols, 10);
1964        assert_eq!(symbols.len(), 5);
1965        assert!(!info.truncated);
1966        assert_eq!(info.total_matches, 5);
1967        assert_eq!(info.limit, 10);
1968    }
1969
1970    #[test]
1971    fn test_apply_symbol_limit_truncates() {
1972        let mut symbols: Vec<DisplaySymbol> = (0..20)
1973            .map(|i| DisplaySymbol {
1974                name: format!("sym{i}"),
1975                qualified_name: format!("sym{i}"),
1976                kind: "function".to_string(),
1977                file_path: std::path::PathBuf::from("a.rs"),
1978                start_line: i,
1979                start_column: 0,
1980                end_line: i,
1981                end_column: 0,
1982                metadata: std::collections::HashMap::new(),
1983                caller_identity: None,
1984                callee_identity: None,
1985            })
1986            .collect();
1987
1988        let info = apply_symbol_limit(&mut symbols, 5);
1989        assert_eq!(symbols.len(), 5);
1990        assert!(info.truncated);
1991        assert_eq!(info.total_matches, 20);
1992        assert_eq!(info.limit, 5);
1993    }
1994
1995    #[test]
1996    fn test_apply_symbol_limit_exact_boundary() {
1997        let mut symbols: Vec<DisplaySymbol> = (0..5)
1998            .map(|i| DisplaySymbol {
1999                name: format!("sym{i}"),
2000                qualified_name: format!("sym{i}"),
2001                kind: "function".to_string(),
2002                file_path: std::path::PathBuf::from("a.rs"),
2003                start_line: i,
2004                start_column: 0,
2005                end_line: i,
2006                end_column: 0,
2007                metadata: std::collections::HashMap::new(),
2008                caller_identity: None,
2009                callee_identity: None,
2010            })
2011            .collect();
2012
2013        let info = apply_symbol_limit(&mut symbols, 5);
2014        assert_eq!(symbols.len(), 5);
2015        assert!(!info.truncated, "Exact boundary should not truncate");
2016    }
2017
2018    // ==========================================================================
2019    // u64_to_f64_lossy additional edge cases
2020    // ==========================================================================
2021
2022    #[test]
2023    fn test_u64_to_f64_lossy_large_values_clamp_to_u32_max() {
2024        let very_large = u64::MAX;
2025        let result = u64_to_f64_lossy(very_large);
2026        // Should clamp to u32::MAX
2027        assert!((result - f64::from(u32::MAX)).abs() < f64::EPSILON);
2028    }
2029
2030    // ==========================================================================
2031    // env_debug_cache_enabled tests
2032    // ==========================================================================
2033
2034    #[serial_test::serial]
2035    #[test]
2036    fn test_env_debug_cache_disabled_by_default() {
2037        // In a clean test environment, SQRY_CACHE_DEBUG should not be set
2038        // (if it is set externally, we skip this test)
2039        unsafe {
2040            std::env::remove_var("SQRY_CACHE_DEBUG");
2041        }
2042        assert!(!env_debug_cache_enabled());
2043    }
2044
2045    #[serial_test::serial]
2046    #[test]
2047    fn test_env_debug_cache_enabled_with_1() {
2048        unsafe {
2049            std::env::set_var("SQRY_CACHE_DEBUG", "1");
2050        }
2051        let result = env_debug_cache_enabled();
2052        unsafe {
2053            std::env::remove_var("SQRY_CACHE_DEBUG");
2054        }
2055        assert!(result);
2056    }
2057
2058    #[serial_test::serial]
2059    #[test]
2060    fn test_env_debug_cache_enabled_with_true() {
2061        unsafe {
2062            std::env::set_var("SQRY_CACHE_DEBUG", "true");
2063        }
2064        let result = env_debug_cache_enabled();
2065        unsafe {
2066            std::env::remove_var("SQRY_CACHE_DEBUG");
2067        }
2068        assert!(result);
2069    }
2070
2071    #[serial_test::serial]
2072    #[test]
2073    fn test_env_debug_cache_enabled_with_true_uppercase() {
2074        unsafe {
2075            std::env::set_var("SQRY_CACHE_DEBUG", "TRUE");
2076        }
2077        let result = env_debug_cache_enabled();
2078        unsafe {
2079            std::env::remove_var("SQRY_CACHE_DEBUG");
2080        }
2081        assert!(result);
2082    }
2083
2084    #[serial_test::serial]
2085    #[test]
2086    fn test_env_debug_cache_disabled_with_zero() {
2087        unsafe {
2088            std::env::set_var("SQRY_CACHE_DEBUG", "0");
2089        }
2090        let result = env_debug_cache_enabled();
2091        unsafe {
2092            std::env::remove_var("SQRY_CACHE_DEBUG");
2093        }
2094        assert!(!result);
2095    }
2096
2097    // ==========================================================================
2098    // build_query_stats tests
2099    // ==========================================================================
2100
2101    #[test]
2102    fn test_build_query_stats_with_index() {
2103        let stats = build_query_stats(true, 10);
2104        assert!(stats.used_index);
2105    }
2106
2107    #[test]
2108    fn test_build_query_stats_without_index() {
2109        let stats = build_query_stats(false, 10);
2110        assert!(!stats.used_index);
2111    }
2112}