Skip to main content

velesdb_core/collection/search/query/
mod.rs

1//! VelesQL query execution for Collection.
2//!
3//! This module orchestrates query execution by combining:
4//! - Query validation (`validation.rs`)
5//! - Condition extraction (`extraction.rs`)
6//! - ORDER BY processing (`ordering.rs`)
7//!
8//! # Future Enhancement: HybridExecutionPlan Integration
9//!
10//! The `HybridExecutionPlan` and `choose_hybrid_strategy()` in `planner.rs`
11//! are ready for integration to optimize query execution based on:
12//! - Query pattern (ORDER BY similarity, filters, etc.)
13//! - Runtime statistics (latency, selectivity)
14//! - Over-fetch factor for filtered queries
15//!
16//! Future: Integrate `QueryPlanner::choose_hybrid_strategy()` into `execute_query()`
17//! to leverage cost-based optimization for complex queries.
18
19#![allow(clippy::uninlined_format_args)] // Prefer readability in query error paths.
20#![allow(clippy::implicit_hasher)] // HashSet hasher genericity adds noise for internal APIs.
21
22mod aggregation;
23pub(crate) mod condition_tree;
24mod distinct;
25#[cfg(test)]
26mod distinct_tests;
27mod execution_paths;
28mod extraction;
29#[cfg(test)]
30mod extraction_tests;
31mod hybrid_sparse;
32#[cfg(test)]
33mod hybrid_sparse_tests;
34pub mod join;
35#[cfg(test)]
36mod join_tests;
37pub mod match_exec;
38#[cfg(test)]
39mod match_exec_tests;
40pub mod match_metrics;
41#[cfg(test)]
42mod match_metrics_tests;
43pub mod match_planner;
44#[cfg(test)]
45mod match_planner_tests;
46mod multi_vector;
47#[cfg(test)]
48mod multi_vector_tests;
49mod ordering;
50#[cfg(test)]
51mod ordering_tests;
52pub mod parallel_traversal;
53#[cfg(test)]
54mod parallel_traversal_tests;
55pub mod projection;
56pub mod pushdown;
57#[cfg(test)]
58mod pushdown_tests;
59pub mod score_fusion;
60#[cfg(test)]
61mod score_fusion_tests;
62mod select_dispatch;
63pub(crate) mod set_operations;
64mod similarity_filter;
65mod sparse_dispatch;
66mod union_query;
67mod validation;
68mod where_eval;
69
70// Re-export for potential external use
71#[allow(unused_imports)]
72pub use ordering::compare_json_values;
73// Re-export join functions for future integration with execute_query
74#[allow(unused_imports)]
75pub use join::{execute_join, JoinedResult};
76
77use crate::collection::types::Collection;
78use crate::error::Result;
79use crate::point::SearchResult;
80use std::collections::HashSet;
81
82/// Maximum allowed LIMIT value to prevent overflow in over-fetch calculations.
83const MAX_LIMIT: usize = 100_000;
84
85/// Context for early-return query paths (NOT-similarity, union).
86struct EarlyReturnCtx<'a> {
87    stmt: &'a crate::velesql::SelectStatement,
88    params: &'a std::collections::HashMap<String, serde_json::Value>,
89    cond: &'a crate::velesql::Condition,
90    has_graph_predicates: bool,
91    limit: usize,
92    ctx: &'a crate::guardrails::QueryContext,
93}
94
95/// Extracted query components from the WHERE clause.
96struct ExtractedComponents {
97    vector_search: Option<Vec<f32>>,
98    similarity_conditions: Vec<(String, Vec<f32>, crate::velesql::CompareOp, f64)>,
99    filter_condition: Option<crate::velesql::Condition>,
100    graph_match_predicates: Vec<crate::velesql::GraphMatchPredicate>,
101    sparse_vector_search: Option<crate::velesql::SparseVectorSearch>,
102    is_union_query: bool,
103    is_not_similarity_query: bool,
104}
105
106impl Collection {
107    /// Executes a `VelesQL` query on this collection with the `"default"` client id.
108    ///
109    /// This method unifies vector search, text search, and metadata filtering
110    /// into a single interface. Compound queries (`UNION`, `INTERSECT`, `EXCEPT`)
111    /// are resolved here before delegation. For per-client rate limiting use
112    /// [`execute_query_with_client`](Self::execute_query_with_client).
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the query cannot be executed (e.g., missing parameters).
117    pub fn execute_query(
118        &self,
119        query: &crate::velesql::Query,
120        params: &std::collections::HashMap<String, serde_json::Value>,
121    ) -> Result<Vec<SearchResult>> {
122        // EPIC-040 US-006: For compound queries, execute each operand without the
123        // outer LIMIT so the set operation sees the full result sets.  The final
124        // LIMIT is applied once on the merged output (SQL-standard behaviour).
125        // Use MAX_LIMIT (not None) to avoid the default-10 cap in execute_query_with_client.
126        let compound_limit = Some(u64::try_from(MAX_LIMIT).unwrap_or(u64::MAX));
127        let left_results = if query.compound.is_some() {
128            let mut left_query = query.clone();
129            left_query.select.limit = compound_limit;
130            left_query.compound = None;
131            self.execute_query_with_client(&left_query, params, "default")?
132        } else {
133            return self.execute_query_with_client(query, params, "default");
134        };
135
136        // compound is guaranteed Some here (non-compound returns above).
137        if let Some(ref compound) = query.compound {
138            let mut accumulated = left_results;
139            for (operator, right_select) in &compound.operations {
140                let mut right_query = crate::velesql::Query::new_select(right_select.clone());
141                right_query.select.limit = compound_limit;
142                let right_results =
143                    self.execute_query_with_client(&right_query, params, "default")?;
144                accumulated =
145                    set_operations::apply_set_operation(accumulated, right_results, *operator);
146            }
147            // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
148            if let Some(limit) = query.select.limit {
149                accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
150            }
151            return Ok(accumulated);
152        }
153
154        Ok(left_results)
155    }
156
157    /// Executes a `VelesQL` query with a specific client identifier for per-client rate limiting.
158    ///
159    /// Each distinct `client_id` maintains an independent token bucket, so one
160    /// busy client cannot exhaust the quota of another.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the query cannot be executed or a guard-rail fires.
165    pub fn execute_query_with_client(
166        &self,
167        query: &crate::velesql::Query,
168        params: &std::collections::HashMap<String, serde_json::Value>,
169        client_id: &str,
170    ) -> Result<Vec<SearchResult>> {
171        // Guard-rail pre-checks: circuit breaker + rate limiting (EPIC-048).
172        self.guard_rails
173            .pre_check(client_id)
174            .map_err(crate::error::Error::from)?;
175
176        // Create per-query execution context for timeout + cardinality tracking.
177        let ctx = self.guard_rails.create_context();
178
179        crate::velesql::QueryValidator::validate(query)
180            .map_err(|e| crate::error::Error::Query(e.to_string()))?;
181
182        // Unified VelesQL dispatch: allow Collection::execute_query() to run top-level MATCH queries.
183        if let Some(match_clause) = query.match_clause.as_ref() {
184            return self.dispatch_match_query(match_clause, params, &ctx);
185        }
186
187        let stmt = &query.select;
188        let limit = usize::try_from(stmt.limit.unwrap_or(10))
189            .unwrap_or(MAX_LIMIT)
190            .min(MAX_LIMIT);
191
192        let extracted = self.extract_query_components(stmt, params)?;
193
194        // Early-return paths for special query shapes.
195        if let Some(results) = self.try_early_return_path(stmt, params, &extracted, limit, &ctx)? {
196            return Ok(results);
197        }
198
199        // Main vector/similarity/metadata dispatch path.
200        let mut results = self.dispatch_main_select(stmt, params, &extracted, limit, &ctx)?;
201
202        // JOIN pushdown analysis (EPIC-031 US-006).
203        self.analyze_join_pushdown(stmt);
204
205        // Final guard-rail checks (EPIC-048).
206        self.check_guardrails_and_record(&ctx, results.len())?;
207
208        // Post-processing: DISTINCT, ORDER BY, LIMIT.
209        results = self.apply_select_postprocessing(stmt, results, params, limit)?;
210
211        // Update QueryPlanner adaptive stats for vector/SELECT queries (Fix #8).
212        if extracted.vector_search.is_some() {
213            // Reason: u128->u64 cast; query durations < u64::MAX µs (~585 millennia)
214            #[allow(clippy::cast_possible_truncation)]
215            let vector_latency_us = ctx.elapsed().as_micros() as u64;
216            self.query_planner
217                .stats()
218                .update_vector_latency(vector_latency_us);
219        }
220        self.guard_rails.circuit_breaker.record_success();
221        Ok(results)
222    }
223
224    /// Extracts all query components from the SELECT statement's WHERE clause.
225    fn extract_query_components(
226        &self,
227        stmt: &crate::velesql::SelectStatement,
228        params: &std::collections::HashMap<String, serde_json::Value>,
229    ) -> Result<ExtractedComponents> {
230        let mut vector_search = None;
231        let mut similarity_conditions = Vec::new();
232        let mut filter_condition = None;
233        let mut graph_match_predicates = Vec::new();
234        let mut sparse_vector_search = None;
235
236        let is_union_query = stmt
237            .where_clause
238            .as_ref()
239            .is_some_and(Self::has_similarity_in_problematic_or);
240        let is_not_similarity_query = stmt
241            .where_clause
242            .as_ref()
243            .is_some_and(Self::has_similarity_under_not);
244
245        if let Some(ref cond) = stmt.where_clause {
246            Self::validate_similarity_query_structure(cond)?;
247            Self::collect_graph_match_predicates(cond, &mut graph_match_predicates);
248            sparse_vector_search = Self::extract_sparse_vector_search(cond).cloned();
249
250            let mut extracted_cond = cond.clone();
251            vector_search = self.extract_vector_search(&mut extracted_cond, params)?;
252            similarity_conditions =
253                self.extract_all_similarity_conditions(&extracted_cond, params)?;
254            filter_condition = Some(extracted_cond);
255        }
256
257        Ok(ExtractedComponents {
258            vector_search,
259            similarity_conditions,
260            filter_condition,
261            graph_match_predicates,
262            sparse_vector_search,
263            is_union_query,
264            is_not_similarity_query,
265        })
266    }
267
268    /// Attempts early-return paths: NOT-similarity, union, and sparse queries.
269    ///
270    /// Returns `Ok(Some(results))` if an early path was taken, `Ok(None)` otherwise.
271    fn try_early_return_path(
272        &self,
273        stmt: &crate::velesql::SelectStatement,
274        params: &std::collections::HashMap<String, serde_json::Value>,
275        extracted: &ExtractedComponents,
276        limit: usize,
277        ctx: &crate::guardrails::QueryContext,
278    ) -> Result<Option<Vec<SearchResult>>> {
279        if let Some(results) =
280            self.try_not_similarity_or_union(stmt, params, extracted, limit, ctx)?
281        {
282            return Ok(Some(results));
283        }
284
285        // Phase 5: Sparse-only or hybrid dense+sparse execution.
286        if let Some(ref svs) = extracted.sparse_vector_search {
287            let results = self.dispatch_sparse_query(stmt, params, extracted, svs, limit, ctx)?;
288            return Ok(Some(results));
289        }
290
291        Ok(None)
292    }
293
294    /// Handles NOT-similarity and union early-return paths.
295    fn try_not_similarity_or_union(
296        &self,
297        stmt: &crate::velesql::SelectStatement,
298        params: &std::collections::HashMap<String, serde_json::Value>,
299        extracted: &ExtractedComponents,
300        limit: usize,
301        ctx: &crate::guardrails::QueryContext,
302    ) -> Result<Option<Vec<SearchResult>>> {
303        let cond = match stmt.where_clause.as_ref() {
304            Some(c) if extracted.is_not_similarity_query || extracted.is_union_query => c,
305            _ => return Ok(None),
306        };
307
308        let has_graph_predicates = !extracted.graph_match_predicates.is_empty();
309        let execution_limit = if has_graph_predicates {
310            MAX_LIMIT
311        } else {
312            limit
313        };
314
315        let early_ctx = EarlyReturnCtx {
316            stmt,
317            params,
318            cond,
319            has_graph_predicates,
320            limit,
321            ctx,
322        };
323
324        // EPIC-044 US-003: NOT similarity() requires full scan
325        if extracted.is_not_similarity_query {
326            let results = self.execute_early_return_query(
327                |s| s.execute_not_similarity_query(cond, params, execution_limit),
328                &early_ctx,
329            )?;
330            return Ok(Some(results));
331        }
332
333        // EPIC-044 US-002: Union mode for similarity() OR metadata
334        let results = self.execute_early_return_query(
335            |s| s.execute_union_query(cond, params, execution_limit),
336            &early_ctx,
337        )?;
338        Ok(Some(results))
339    }
340
341    /// Executes an early-return query path with guard-rail checks and post-processing.
342    fn execute_early_return_query(
343        &self,
344        execute_fn: impl FnOnce(&Self) -> Result<Vec<SearchResult>>,
345        early: &EarlyReturnCtx<'_>,
346    ) -> Result<Vec<SearchResult>> {
347        let mut results =
348            execute_fn(self).inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
349        if early.has_graph_predicates {
350            results = self
351                .apply_where_condition_to_results(
352                    results,
353                    early.cond,
354                    early.params,
355                    &early.stmt.from_alias,
356                )
357                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
358        }
359        if let Some(ref order_by) = early.stmt.order_by {
360            self.apply_order_by(&mut results, order_by, early.params)
361                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
362        }
363        results.truncate(early.limit);
364        self.check_guardrails_and_record(early.ctx, results.len())?;
365        self.guard_rails.circuit_breaker.record_success();
366        Ok(results)
367    }
368
369    // NOTE: dispatch_sparse_query, execute_sparse_or_hybrid, filter_by_graph_predicates,
370    // finalize_sparse_results, resolve_fusion_strategy moved to sparse_dispatch.rs (T3-3)
371
372    // NOTE: compute_cbo_strategy, dispatch_main_select, dispatch_match_query,
373    // analyze_join_pushdown, apply_select_postprocessing moved to select_dispatch.rs
374
375    /// Checks timeout and cardinality guard-rails, recording failure on violation.
376    fn check_guardrails_and_record(
377        &self,
378        ctx: &crate::guardrails::QueryContext,
379        result_count: usize,
380    ) -> Result<()> {
381        ctx.check_timeout()
382            .map_err(crate::error::Error::from)
383            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
384        ctx.check_cardinality(result_count)
385            .map_err(crate::error::Error::from)
386            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
387        Ok(())
388    }
389
390    /// Parses and executes a VelesQL query string, using the collection-level parse cache (P1-A).
391    ///
392    /// Equivalent to calling `Parser::parse(sql)` followed by `execute_query()`, but caches
393    /// parsed ASTs so repeated identical queries avoid re-parsing overhead.
394    ///
395    /// # Arguments
396    ///
397    /// * `sql` - Raw VelesQL query string
398    /// * `params` - Query parameters for resolving placeholders (e.g., `$v`)
399    ///
400    /// # Errors
401    ///
402    /// Returns a parse error if `sql` is invalid, or an execution error if the query fails.
403    pub fn execute_query_str(
404        &self,
405        sql: &str,
406        params: &std::collections::HashMap<String, serde_json::Value>,
407    ) -> Result<Vec<SearchResult>> {
408        let query = self
409            .query_cache
410            .parse(sql)
411            .map_err(|e| crate::error::Error::Query(e.to_string()))?;
412        self.execute_query(&query, params)
413    }
414
415    // NOTE: apply_distinct and compute_distinct_key moved to distinct.rs
416    // (EPIC-061/US-003 refactoring)
417
418    // NOTE: filter_by_similarity, execute_not_similarity_query, extract_not_similarity_condition,
419    // execute_scan_query moved to similarity_filter.rs (Plan 04-04)
420
421    // NOTE: execute_union_query, matches_metadata_filter, split_or_condition_with_outer_filter
422    // moved to union_query.rs (Plan 04-04)
423}