Skip to main content

velesdb_core/collection/search/query/
mod.rs

1//! VelesQL query execution for Collection.
2//!
3//! This module orchestrates query execution by combining:
4//! - Query validation (`validation.rs`)
5//! - Condition extraction (`extraction.rs`)
6//! - ORDER BY processing (`ordering.rs`)
7//!
8//! # Future Enhancement: HybridExecutionPlan Integration
9//!
10//! The `HybridExecutionPlan` and `choose_hybrid_strategy()` in `planner.rs`
11//! are ready for integration to optimize query execution based on:
12//! - Query pattern (ORDER BY similarity, filters, etc.)
13//! - Runtime statistics (latency, selectivity)
14//! - Over-fetch factor for filtered queries
15//!
16//! Future: Integrate `QueryPlanner::choose_hybrid_strategy()` into `execute_query()`
17//! to leverage cost-based optimization for complex queries.
18
19#![allow(clippy::uninlined_format_args)] // Prefer readability in query error paths.
20#![allow(clippy::implicit_hasher)] // HashSet hasher genericity adds noise for internal APIs.
21
22mod aggregation;
23mod distinct;
24mod execution_paths;
25mod extraction;
26#[cfg(test)]
27mod extraction_tests;
28mod hybrid_sparse;
29#[cfg(test)]
30mod hybrid_sparse_tests;
31pub mod join;
32#[cfg(test)]
33mod join_tests;
34pub mod match_exec;
35#[cfg(test)]
36mod match_exec_tests;
37pub mod match_metrics;
38#[cfg(test)]
39mod match_metrics_tests;
40pub mod match_planner;
41#[cfg(test)]
42mod match_planner_tests;
43mod multi_vector;
44mod ordering;
45pub mod parallel_traversal;
46#[cfg(test)]
47mod parallel_traversal_tests;
48pub mod pushdown;
49#[cfg(test)]
50mod pushdown_tests;
51pub mod score_fusion;
52#[cfg(test)]
53mod score_fusion_tests;
54mod similarity_filter;
55mod union_query;
56mod validation;
57mod where_eval;
58
59// Re-export for potential external use
60#[allow(unused_imports)]
61pub use ordering::compare_json_values;
62// Re-export join functions for future integration with execute_query
63#[allow(unused_imports)]
64pub use join::{execute_join, JoinedResult};
65
66use crate::collection::types::Collection;
67use crate::error::Result;
68use crate::point::SearchResult;
69use std::collections::HashSet;
70
71/// Maximum allowed LIMIT value to prevent overflow in over-fetch calculations.
72const MAX_LIMIT: usize = 100_000;
73
74impl Collection {
75    /// Executes a `VelesQL` query on this collection with the `"default"` client id.
76    ///
77    /// This method unifies vector search, text search, and metadata filtering
78    /// into a single interface. For per-client rate limiting use
79    /// [`execute_query_with_client`](Self::execute_query_with_client).
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the query cannot be executed (e.g., missing parameters).
84    pub fn execute_query(
85        &self,
86        query: &crate::velesql::Query,
87        params: &std::collections::HashMap<String, serde_json::Value>,
88    ) -> Result<Vec<SearchResult>> {
89        self.execute_query_with_client(query, params, "default")
90    }
91
92    /// Executes a `VelesQL` query with a specific client identifier for per-client rate limiting.
93    ///
94    /// Each distinct `client_id` maintains an independent token bucket, so one
95    /// busy client cannot exhaust the quota of another.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the query cannot be executed or a guard-rail fires.
100    #[allow(clippy::too_many_lines)] // Complex dispatch logic - refactoring planned
101    pub fn execute_query_with_client(
102        &self,
103        query: &crate::velesql::Query,
104        params: &std::collections::HashMap<String, serde_json::Value>,
105        client_id: &str,
106    ) -> Result<Vec<SearchResult>> {
107        // Guard-rail pre-checks: circuit breaker + rate limiting (EPIC-048).
108        self.guard_rails
109            .pre_check(client_id)
110            .map_err(crate::error::Error::from)?;
111
112        // Create per-query execution context for timeout + cardinality tracking.
113        let ctx = self.guard_rails.create_context();
114
115        crate::velesql::QueryValidator::validate(query)
116            .map_err(|e| crate::error::Error::Query(e.to_string()))?;
117
118        // Unified VelesQL dispatch: allow Collection::execute_query() to run top-level MATCH queries.
119        if let Some(match_clause) = query.match_clause.as_ref() {
120            let match_results =
121                self.execute_match_with_context(match_clause, params, Some(&ctx))?;
122
123            // Check timeout after potentially expensive traversal.
124            ctx.check_timeout()
125                .map_err(crate::error::Error::from)
126                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
127
128            let mut sorted = match_results;
129            if let Some(order_by) = match_clause.return_clause.order_by.as_ref() {
130                for item in order_by.iter().rev() {
131                    self.order_match_results(&mut sorted, &item.expression, item.descending);
132                }
133            }
134
135            let mut results = self
136                .match_results_to_search_results(sorted)
137                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
138            // Final cardinality check for MATCH path (EPIC-048 US-003).
139            // execute_match_with_context only checks cardinality periodically every 100
140            // traversal iterations. A query with <100 iterations that produces many results
141            // (e.g., high fan-out from a single start node) bypasses the periodic check.
142            // This explicit check on the final result set closes that gap.
143            ctx.check_cardinality(results.len())
144                .map_err(crate::error::Error::from)
145                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
146            if let Some(limit) = match_clause.return_clause.limit {
147                let limit = usize::try_from(limit).unwrap_or(MAX_LIMIT).min(MAX_LIMIT);
148                results.truncate(limit);
149            }
150            // Update QueryPlanner adaptive stats for graph/MATCH queries (Fix #8).
151            // Reason: u128->u64 cast; query durations < u64::MAX µs (~585 millennia)
152            #[allow(clippy::cast_possible_truncation)]
153            let graph_latency_us = ctx.elapsed().as_micros() as u64;
154            self.query_planner
155                .stats()
156                .update_graph_latency(graph_latency_us);
157            self.guard_rails.circuit_breaker.record_success();
158            return Ok(results);
159        }
160
161        let stmt = &query.select;
162        // Cap limit to prevent overflow in over-fetch calculations
163        let limit = usize::try_from(stmt.limit.unwrap_or(10))
164            .unwrap_or(MAX_LIMIT)
165            .min(MAX_LIMIT);
166
167        // 1. Extract vector search (NEAR) or similarity() conditions if present
168        let mut vector_search = None;
169        let mut similarity_conditions: Vec<(String, Vec<f32>, crate::velesql::CompareOp, f64)> =
170            Vec::new();
171        let mut filter_condition = None;
172        let mut graph_match_predicates = Vec::new();
173
174        // EPIC-044 US-002: Check for similarity() OR metadata pattern (union mode)
175        let is_union_query = if let Some(ref cond) = stmt.where_clause {
176            Self::has_similarity_in_problematic_or(cond)
177        } else {
178            false
179        };
180
181        // EPIC-044 US-003: Check for NOT similarity() pattern (scan mode)
182        let is_not_similarity_query = if let Some(ref cond) = stmt.where_clause {
183            Self::has_similarity_under_not(cond)
184        } else {
185            false
186        };
187
188        // Extract sparse vector search condition (Phase 5 hybrid support).
189        let mut sparse_vector_search = None;
190
191        if let Some(ref cond) = stmt.where_clause {
192            // Validate query structure before extraction
193            Self::validate_similarity_query_structure(cond)?;
194            Self::collect_graph_match_predicates(cond, &mut graph_match_predicates);
195
196            // Check for SPARSE_NEAR before cloning for dense extraction.
197            sparse_vector_search = Self::extract_sparse_vector_search(cond).cloned();
198
199            // Reason: extract_vector_search mutates the condition in-place to remove the NEAR node;
200            // the original cond is still needed for similarity/filter extraction below.
201            // Clone is unavoidable until extract_vector_search returns a new condition instead.
202            let mut extracted_cond = cond.clone();
203            vector_search = self.extract_vector_search(&mut extracted_cond, params)?;
204            // EPIC-044 US-001: Extract ALL similarity conditions for cascade filtering
205            similarity_conditions =
206                self.extract_all_similarity_conditions(&extracted_cond, params)?;
207            filter_condition = Some(extracted_cond);
208
209            // NEAR + similarity() is supported: NEAR finds candidates, similarity() filters by threshold
210            // Multiple similarity() with AND is supported: filters applied sequentially (cascade)
211        }
212
213        // 2. Resolve WITH clause options
214        let mut ef_search = None;
215        if let Some(ref with) = stmt.with_clause {
216            ef_search = with.get_ef_search();
217        }
218
219        // Get first similarity condition for initial search (if any)
220        let first_similarity = similarity_conditions.first().cloned();
221        let has_graph_predicates = !graph_match_predicates.is_empty();
222        let skip_metadata_prefilter_for_graph_or = has_graph_predicates
223            && stmt
224                .where_clause
225                .as_ref()
226                .is_some_and(Self::condition_contains_or);
227        let execution_limit = if has_graph_predicates {
228            MAX_LIMIT
229        } else {
230            limit
231        };
232
233        // 3a. CBO: Choose execution strategy via QueryPlanner (EPIC-046).
234        //
235        // Uses choose_strategy_with_cbo_and_overfetch() which returns both the strategy
236        // and the selectivity-derived over-fetch factor in a single pass.
237        let (cbo_strategy, cbo_over_fetch) = {
238            let col_stats = self.get_stats();
239            self.query_planner.choose_strategy_with_cbo_and_overfetch(
240                &col_stats,
241                filter_condition.as_ref(),
242                limit,
243            )
244        };
245        tracing::debug!(
246            strategy = ?cbo_strategy,
247            over_fetch = cbo_over_fetch,
248            "CBO selected execution strategy"
249        );
250
251        // 3. Execute query based on extracted components
252        // EPIC-044 US-003: NOT similarity() requires full scan
253        if is_not_similarity_query {
254            if let Some(ref cond) = stmt.where_clause {
255                let mut results = self
256                    .execute_not_similarity_query(cond, params, execution_limit)
257                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
258                if has_graph_predicates {
259                    results = self
260                        .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
261                        .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
262                }
263
264                // Apply ORDER BY if present
265                if let Some(ref order_by) = stmt.order_by {
266                    self.apply_order_by(&mut results, order_by, params)
267                        .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
268                }
269                results.truncate(limit);
270                // Guard-rail checks for early-return paths (EPIC-048 US-001, US-003).
271                // These paths return before the main-path checks at the bottom of execute_query.
272                // NOT similarity() is a full table scan — timeout and cardinality MUST be enforced.
273                ctx.check_timeout()
274                    .map_err(crate::error::Error::from)
275                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
276                ctx.check_cardinality(results.len())
277                    .map_err(crate::error::Error::from)
278                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
279                self.guard_rails.circuit_breaker.record_success();
280                return Ok(results);
281            }
282        }
283
284        // EPIC-044 US-002: Union mode for similarity() OR metadata
285        if is_union_query {
286            if let Some(ref cond) = stmt.where_clause {
287                let mut results = self
288                    .execute_union_query(cond, params, execution_limit)
289                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
290                if has_graph_predicates {
291                    results = self
292                        .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
293                        .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
294                }
295
296                // Apply ORDER BY if present
297                if let Some(ref order_by) = stmt.order_by {
298                    self.apply_order_by(&mut results, order_by, params)
299                        .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
300                }
301                results.truncate(limit);
302                // Guard-rail checks for early-return paths (EPIC-048 US-001, US-003).
303                // Union queries return here without reaching the main-path guard-rail checks.
304                ctx.check_timeout()
305                    .map_err(crate::error::Error::from)
306                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
307                ctx.check_cardinality(results.len())
308                    .map_err(crate::error::Error::from)
309                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
310                self.guard_rails.circuit_breaker.record_success();
311                return Ok(results);
312            }
313        }
314
315        // Phase 5: Sparse-only or hybrid dense+sparse execution.
316        if let Some(ref svs) = sparse_vector_search {
317            let mut results = if let Some(ref dense_vec) = vector_search {
318                // Hybrid: dense NEAR + SPARSE_NEAR -> parallel execution + fusion.
319                let fusion_strategy = stmt.fusion_clause.as_ref().map_or_else(
320                    crate::fusion::FusionStrategy::rrf_default,
321                    |fc| {
322                        use crate::velesql::FusionStrategyType;
323                        match fc.strategy {
324                            FusionStrategyType::Rsf => {
325                                let dw = fc.dense_weight.unwrap_or(0.5);
326                                let sw = fc.sparse_weight.unwrap_or(0.5);
327                                crate::fusion::FusionStrategy::relative_score(dw, sw)
328                                    .unwrap_or_else(|_| {
329                                        crate::fusion::FusionStrategy::rrf_default()
330                                    })
331                            }
332                            FusionStrategyType::Rrf => crate::fusion::FusionStrategy::RRF {
333                                k: fc.k.unwrap_or(60),
334                            },
335                            _ => crate::fusion::FusionStrategy::rrf_default(),
336                        }
337                    },
338                );
339                self.execute_hybrid_search_with_strategy(
340                    dense_vec,
341                    svs,
342                    params,
343                    filter_condition.as_ref(),
344                    execution_limit,
345                    &fusion_strategy,
346                )
347                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?
348            } else {
349                // Sparse-only: no dense NEAR.
350                self.execute_sparse_search(svs, params, filter_condition.as_ref(), execution_limit)
351                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?
352            };
353
354            if has_graph_predicates {
355                if let Some(cond) = stmt.where_clause.as_ref() {
356                    results = self
357                        .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
358                        .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
359                }
360            }
361
362            ctx.check_timeout()
363                .map_err(crate::error::Error::from)
364                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
365            ctx.check_cardinality(results.len())
366                .map_err(crate::error::Error::from)
367                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
368
369            if stmt.distinct == crate::velesql::DistinctMode::All {
370                results = distinct::apply_distinct(results, &stmt.columns);
371            }
372            if let Some(ref order_by) = stmt.order_by {
373                self.apply_order_by(&mut results, order_by, params)?;
374            }
375            results.truncate(limit);
376            self.guard_rails.circuit_breaker.record_success();
377            return Ok(results);
378        }
379
380        // EPIC-044 US-001: Support multiple similarity() with AND (cascade filtering).
381        // Dispatch delegated to dispatch_vector_query() in execution_paths.rs.
382        let mut results = self
383            .dispatch_vector_query(
384                vector_search.as_ref(),
385                first_similarity.as_ref(),
386                &similarity_conditions,
387                filter_condition.as_ref(),
388                execution_limit,
389                skip_metadata_prefilter_for_graph_or,
390                ef_search,
391                cbo_strategy,
392                cbo_over_fetch,
393            )
394            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
395
396        if has_graph_predicates {
397            if let Some(cond) = stmt.where_clause.as_ref() {
398                results = self
399                    .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
400                    .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
401            }
402        }
403
404        // P1-B: Filter pushdown analysis for JOIN queries (EPIC-031 US-006).
405        //
406        // When the query has JOIN clauses, analyze WHERE conditions to classify filters:
407        // - Graph/payload filters → already applied above
408        // - ColumnStore filters → logged for future cross-store JOIN execution
409        // - Post-JOIN filters → deferred (require cross-store data)
410        //
411        // This wires up `analyze_for_pushdown` to activate the classification,
412        // enabling the future ColumnStore JOIN executor to consume the analysis.
413        if !stmt.joins.is_empty() {
414            if let Some(ref cond) = stmt.where_clause {
415                // BUG-8 fix: from_alias is now Vec<String> containing all aliases
416                // visible in scope (FROM alias + JOIN aliases).
417                let graph_vars: std::collections::HashSet<String> =
418                    stmt.from_alias.iter().cloned().collect();
419                let join_tables = pushdown::extract_join_tables(&stmt.joins);
420                let analysis = pushdown::analyze_for_pushdown(cond, &graph_vars, &join_tables);
421                tracing::debug!(
422                    column_store_filters = analysis.column_store_filters.len(),
423                    graph_filters = analysis.graph_filters.len(),
424                    post_join_filters = analysis.post_join_filters.len(),
425                    has_pushdown = analysis.has_pushdown(),
426                    "JOIN pushdown analysis complete"
427                );
428            }
429        }
430
431        // Check timeout after all search/filter operations (EPIC-048 US-001).
432        ctx.check_timeout()
433            .map_err(crate::error::Error::from)
434            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
435
436        // Check cardinality on final result set (EPIC-048 US-003).
437        // check_cardinality uses fetch_add internally; since this is the only call on this
438        // ctx for the SELECT path, passing results.len() as the delta is correct.
439        ctx.check_cardinality(results.len())
440            .map_err(crate::error::Error::from)
441            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
442
443        // EPIC-052 US-001: Apply DISTINCT deduplication if requested
444        if stmt.distinct == crate::velesql::DistinctMode::All {
445            results = distinct::apply_distinct(results, &stmt.columns);
446        }
447
448        // Apply ORDER BY if present
449        if let Some(ref order_by) = stmt.order_by {
450            self.apply_order_by(&mut results, order_by, params)?;
451        }
452
453        // Apply limit
454        results.truncate(limit);
455
456        // Update QueryPlanner adaptive stats for vector/SELECT queries (Fix #8).
457        // Only record vector latency when a NEAR search was performed.
458        if vector_search.is_some() {
459            // Reason: u128->u64 cast; query durations < u64::MAX µs (~585 millennia)
460            #[allow(clippy::cast_possible_truncation)]
461            let vector_latency_us = ctx.elapsed().as_micros() as u64;
462            self.query_planner
463                .stats()
464                .update_vector_latency(vector_latency_us);
465        }
466        self.guard_rails.circuit_breaker.record_success();
467        Ok(results)
468    }
469
470    /// Parses and executes a VelesQL query string, using the collection-level parse cache (P1-A).
471    ///
472    /// Equivalent to calling `Parser::parse(sql)` followed by `execute_query()`, but caches
473    /// parsed ASTs so repeated identical queries avoid re-parsing overhead.
474    ///
475    /// # Arguments
476    ///
477    /// * `sql` - Raw VelesQL query string
478    /// * `params` - Query parameters for resolving placeholders (e.g., `$v`)
479    ///
480    /// # Errors
481    ///
482    /// Returns a parse error if `sql` is invalid, or an execution error if the query fails.
483    pub fn execute_query_str(
484        &self,
485        sql: &str,
486        params: &std::collections::HashMap<String, serde_json::Value>,
487    ) -> Result<Vec<SearchResult>> {
488        let query = self
489            .query_cache
490            .parse(sql)
491            .map_err(|e| crate::error::Error::Query(e.to_string()))?;
492        self.execute_query(&query, params)
493    }
494
495    // NOTE: apply_distinct and compute_distinct_key moved to distinct.rs
496    // (EPIC-061/US-003 refactoring)
497
498    // NOTE: filter_by_similarity, execute_not_similarity_query, extract_not_similarity_condition,
499    // execute_scan_query moved to similarity_filter.rs (Plan 04-04)
500
501    // NOTE: execute_union_query, matches_metadata_filter, split_or_condition_with_outer_filter
502    // moved to union_query.rs (Plan 04-04)
503}