Skip to main content

velesdb_core/collection/search/query/
mod.rs

1//! VelesQL query execution for Collection.
2//!
3//! This module orchestrates query execution by combining:
4//! - Query validation (`validation.rs`)
5//! - Condition extraction (`extraction.rs`)
6//! - ORDER BY processing (`ordering.rs`)
7//!
8//! # Future Enhancement: HybridExecutionPlan Integration
9//!
10//! The `HybridExecutionPlan` and `choose_hybrid_strategy()` in `planner.rs`
11//! are ready for integration to optimize query execution based on:
12//! - Query pattern (ORDER BY similarity, filters, etc.)
13//! - Runtime statistics (latency, selectivity)
14//! - Over-fetch factor for filtered queries
15//!
16//! Future: Integrate `QueryPlanner::choose_hybrid_strategy()` into `execute_query()`
17//! to leverage cost-based optimization for complex queries.
18
19#![allow(clippy::uninlined_format_args)] // Prefer readability in query error paths.
20#![allow(clippy::implicit_hasher)] // HashSet hasher genericity adds noise for internal APIs.
21
22mod aggregation;
23pub(crate) mod condition_tree;
24mod distinct;
25#[cfg(test)]
26mod distinct_tests;
27mod execution_paths;
28mod extraction;
29#[cfg(test)]
30mod extraction_tests;
31mod hybrid_sparse;
32#[cfg(test)]
33mod hybrid_sparse_tests;
34pub mod join;
35#[cfg(test)]
36mod join_tests;
37pub mod match_exec;
38#[cfg(test)]
39mod match_exec_tests;
40pub mod match_metrics;
41#[cfg(test)]
42mod match_metrics_tests;
43pub mod match_planner;
44#[cfg(test)]
45mod match_planner_tests;
46mod multi_vector;
47#[cfg(test)]
48mod multi_vector_tests;
49mod ordering;
50#[cfg(test)]
51mod ordering_tests;
52pub mod parallel_traversal;
53#[cfg(test)]
54mod parallel_traversal_tests;
55pub mod projection;
56pub mod pushdown;
57#[cfg(test)]
58mod pushdown_tests;
59pub mod score_fusion;
60#[cfg(test)]
61mod score_fusion_tests;
62mod select_dispatch;
63pub(crate) mod set_operations;
64mod similarity_filter;
65mod sparse_dispatch;
66mod union_query;
67mod validation;
68mod where_eval;
69
70// Re-export for potential external use
71#[allow(unused_imports)]
72pub use ordering::compare_json_values;
73// Re-export join functions for future integration with execute_query
74#[allow(unused_imports)]
75pub use join::{execute_join, JoinedResult};
76
77use crate::collection::types::Collection;
78use crate::error::Result;
79use crate::point::SearchResult;
80use std::collections::HashSet;
81
82/// Maximum allowed LIMIT value to prevent overflow in over-fetch calculations.
83const MAX_LIMIT: usize = 100_000;
84
85/// Context for early-return query paths (NOT-similarity, union).
86struct EarlyReturnCtx<'a> {
87    stmt: &'a crate::velesql::SelectStatement,
88    params: &'a std::collections::HashMap<String, serde_json::Value>,
89    cond: &'a crate::velesql::Condition,
90    has_graph_predicates: bool,
91    limit: usize,
92    ctx: &'a crate::guardrails::QueryContext,
93}
94
95/// Extracted query components from the WHERE clause.
96struct ExtractedComponents {
97    vector_search: Option<Vec<f32>>,
98    similarity_conditions: Vec<(String, Vec<f32>, crate::velesql::CompareOp, f64)>,
99    filter_condition: Option<crate::velesql::Condition>,
100    graph_match_predicates: Vec<crate::velesql::GraphMatchPredicate>,
101    sparse_vector_search: Option<crate::velesql::SparseVectorSearch>,
102    is_union_query: bool,
103    is_not_similarity_query: bool,
104}
105
106impl Collection {
107    /// Executes a `VelesQL` query on this collection with the `"default"` client id.
108    ///
109    /// This method unifies vector search, text search, and metadata filtering
110    /// into a single interface. Compound queries (`UNION`, `INTERSECT`, `EXCEPT`)
111    /// are resolved here before delegation. For per-client rate limiting use
112    /// [`execute_query_with_client`](Self::execute_query_with_client).
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the query cannot be executed (e.g., missing parameters).
117    pub fn execute_query(
118        &self,
119        query: &crate::velesql::Query,
120        params: &std::collections::HashMap<String, serde_json::Value>,
121    ) -> Result<Vec<SearchResult>> {
122        // EPIC-040 US-006: For compound queries, execute each operand without the
123        // outer LIMIT so the set operation sees the full result sets.  The final
124        // LIMIT is applied once on the merged output (SQL-standard behaviour).
125        // Use MAX_LIMIT (not None) to avoid the default-10 cap in execute_query_with_client.
126        let compound_limit = Some(u64::try_from(MAX_LIMIT).unwrap_or(u64::MAX));
127        let left_results = if query.compound.is_some() {
128            let mut left_query = query.clone();
129            left_query.select.limit = compound_limit;
130            left_query.compound = None;
131            self.execute_query_with_client(&left_query, params, "default")?
132        } else {
133            return self.execute_query_with_client(query, params, "default");
134        };
135
136        // compound is guaranteed Some here (non-compound returns above).
137        if let Some(ref compound) = query.compound {
138            let mut right_query = crate::velesql::Query::new_select(*compound.right.clone());
139            right_query.select.limit = compound_limit;
140            let right_results = self.execute_query_with_client(&right_query, params, "default")?;
141            let mut merged =
142                set_operations::apply_set_operation(left_results, right_results, compound.operator);
143            // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
144            if let Some(limit) = query.select.limit {
145                merged.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
146            }
147            return Ok(merged);
148        }
149
150        Ok(left_results)
151    }
152
153    /// Executes a `VelesQL` query with a specific client identifier for per-client rate limiting.
154    ///
155    /// Each distinct `client_id` maintains an independent token bucket, so one
156    /// busy client cannot exhaust the quota of another.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the query cannot be executed or a guard-rail fires.
161    pub fn execute_query_with_client(
162        &self,
163        query: &crate::velesql::Query,
164        params: &std::collections::HashMap<String, serde_json::Value>,
165        client_id: &str,
166    ) -> Result<Vec<SearchResult>> {
167        // Guard-rail pre-checks: circuit breaker + rate limiting (EPIC-048).
168        self.guard_rails
169            .pre_check(client_id)
170            .map_err(crate::error::Error::from)?;
171
172        // Create per-query execution context for timeout + cardinality tracking.
173        let ctx = self.guard_rails.create_context();
174
175        crate::velesql::QueryValidator::validate(query)
176            .map_err(|e| crate::error::Error::Query(e.to_string()))?;
177
178        // Unified VelesQL dispatch: allow Collection::execute_query() to run top-level MATCH queries.
179        if let Some(match_clause) = query.match_clause.as_ref() {
180            return self.dispatch_match_query(match_clause, params, &ctx);
181        }
182
183        let stmt = &query.select;
184        let limit = usize::try_from(stmt.limit.unwrap_or(10))
185            .unwrap_or(MAX_LIMIT)
186            .min(MAX_LIMIT);
187
188        let extracted = self.extract_query_components(stmt, params)?;
189
190        // Early-return paths for special query shapes.
191        if let Some(results) = self.try_early_return_path(stmt, params, &extracted, limit, &ctx)? {
192            return Ok(results);
193        }
194
195        // Main vector/similarity/metadata dispatch path.
196        let mut results = self.dispatch_main_select(stmt, params, &extracted, limit, &ctx)?;
197
198        // JOIN pushdown analysis (EPIC-031 US-006).
199        self.analyze_join_pushdown(stmt);
200
201        // Final guard-rail checks (EPIC-048).
202        self.check_guardrails_and_record(&ctx, results.len())?;
203
204        // Post-processing: DISTINCT, ORDER BY, LIMIT.
205        results = self.apply_select_postprocessing(stmt, results, params, limit)?;
206
207        // Update QueryPlanner adaptive stats for vector/SELECT queries (Fix #8).
208        if extracted.vector_search.is_some() {
209            // Reason: u128->u64 cast; query durations < u64::MAX µs (~585 millennia)
210            #[allow(clippy::cast_possible_truncation)]
211            let vector_latency_us = ctx.elapsed().as_micros() as u64;
212            self.query_planner
213                .stats()
214                .update_vector_latency(vector_latency_us);
215        }
216        self.guard_rails.circuit_breaker.record_success();
217        Ok(results)
218    }
219
220    /// Extracts all query components from the SELECT statement's WHERE clause.
221    fn extract_query_components(
222        &self,
223        stmt: &crate::velesql::SelectStatement,
224        params: &std::collections::HashMap<String, serde_json::Value>,
225    ) -> Result<ExtractedComponents> {
226        let mut vector_search = None;
227        let mut similarity_conditions = Vec::new();
228        let mut filter_condition = None;
229        let mut graph_match_predicates = Vec::new();
230        let mut sparse_vector_search = None;
231
232        let is_union_query = stmt
233            .where_clause
234            .as_ref()
235            .is_some_and(Self::has_similarity_in_problematic_or);
236        let is_not_similarity_query = stmt
237            .where_clause
238            .as_ref()
239            .is_some_and(Self::has_similarity_under_not);
240
241        if let Some(ref cond) = stmt.where_clause {
242            Self::validate_similarity_query_structure(cond)?;
243            Self::collect_graph_match_predicates(cond, &mut graph_match_predicates);
244            sparse_vector_search = Self::extract_sparse_vector_search(cond).cloned();
245
246            let mut extracted_cond = cond.clone();
247            vector_search = self.extract_vector_search(&mut extracted_cond, params)?;
248            similarity_conditions =
249                self.extract_all_similarity_conditions(&extracted_cond, params)?;
250            filter_condition = Some(extracted_cond);
251        }
252
253        Ok(ExtractedComponents {
254            vector_search,
255            similarity_conditions,
256            filter_condition,
257            graph_match_predicates,
258            sparse_vector_search,
259            is_union_query,
260            is_not_similarity_query,
261        })
262    }
263
264    /// Attempts early-return paths: NOT-similarity, union, and sparse queries.
265    ///
266    /// Returns `Ok(Some(results))` if an early path was taken, `Ok(None)` otherwise.
267    fn try_early_return_path(
268        &self,
269        stmt: &crate::velesql::SelectStatement,
270        params: &std::collections::HashMap<String, serde_json::Value>,
271        extracted: &ExtractedComponents,
272        limit: usize,
273        ctx: &crate::guardrails::QueryContext,
274    ) -> Result<Option<Vec<SearchResult>>> {
275        if let Some(results) =
276            self.try_not_similarity_or_union(stmt, params, extracted, limit, ctx)?
277        {
278            return Ok(Some(results));
279        }
280
281        // Phase 5: Sparse-only or hybrid dense+sparse execution.
282        if let Some(ref svs) = extracted.sparse_vector_search {
283            let results = self.dispatch_sparse_query(stmt, params, extracted, svs, limit, ctx)?;
284            return Ok(Some(results));
285        }
286
287        Ok(None)
288    }
289
290    /// Handles NOT-similarity and union early-return paths.
291    fn try_not_similarity_or_union(
292        &self,
293        stmt: &crate::velesql::SelectStatement,
294        params: &std::collections::HashMap<String, serde_json::Value>,
295        extracted: &ExtractedComponents,
296        limit: usize,
297        ctx: &crate::guardrails::QueryContext,
298    ) -> Result<Option<Vec<SearchResult>>> {
299        let cond = match stmt.where_clause.as_ref() {
300            Some(c) if extracted.is_not_similarity_query || extracted.is_union_query => c,
301            _ => return Ok(None),
302        };
303
304        let has_graph_predicates = !extracted.graph_match_predicates.is_empty();
305        let execution_limit = if has_graph_predicates {
306            MAX_LIMIT
307        } else {
308            limit
309        };
310
311        let early_ctx = EarlyReturnCtx {
312            stmt,
313            params,
314            cond,
315            has_graph_predicates,
316            limit,
317            ctx,
318        };
319
320        // EPIC-044 US-003: NOT similarity() requires full scan
321        if extracted.is_not_similarity_query {
322            let results = self.execute_early_return_query(
323                |s| s.execute_not_similarity_query(cond, params, execution_limit),
324                &early_ctx,
325            )?;
326            return Ok(Some(results));
327        }
328
329        // EPIC-044 US-002: Union mode for similarity() OR metadata
330        let results = self.execute_early_return_query(
331            |s| s.execute_union_query(cond, params, execution_limit),
332            &early_ctx,
333        )?;
334        Ok(Some(results))
335    }
336
337    /// Executes an early-return query path with guard-rail checks and post-processing.
338    fn execute_early_return_query(
339        &self,
340        execute_fn: impl FnOnce(&Self) -> Result<Vec<SearchResult>>,
341        early: &EarlyReturnCtx<'_>,
342    ) -> Result<Vec<SearchResult>> {
343        let mut results =
344            execute_fn(self).inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
345        if early.has_graph_predicates {
346            results = self
347                .apply_where_condition_to_results(
348                    results,
349                    early.cond,
350                    early.params,
351                    &early.stmt.from_alias,
352                )
353                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
354        }
355        if let Some(ref order_by) = early.stmt.order_by {
356            self.apply_order_by(&mut results, order_by, early.params)
357                .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
358        }
359        results.truncate(early.limit);
360        self.check_guardrails_and_record(early.ctx, results.len())?;
361        self.guard_rails.circuit_breaker.record_success();
362        Ok(results)
363    }
364
365    // NOTE: dispatch_sparse_query, execute_sparse_or_hybrid, filter_by_graph_predicates,
366    // finalize_sparse_results, resolve_fusion_strategy moved to sparse_dispatch.rs (T3-3)
367
368    // NOTE: compute_cbo_strategy, dispatch_main_select, dispatch_match_query,
369    // analyze_join_pushdown, apply_select_postprocessing moved to select_dispatch.rs
370
371    /// Checks timeout and cardinality guard-rails, recording failure on violation.
372    fn check_guardrails_and_record(
373        &self,
374        ctx: &crate::guardrails::QueryContext,
375        result_count: usize,
376    ) -> Result<()> {
377        ctx.check_timeout()
378            .map_err(crate::error::Error::from)
379            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
380        ctx.check_cardinality(result_count)
381            .map_err(crate::error::Error::from)
382            .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
383        Ok(())
384    }
385
386    /// Parses and executes a VelesQL query string, using the collection-level parse cache (P1-A).
387    ///
388    /// Equivalent to calling `Parser::parse(sql)` followed by `execute_query()`, but caches
389    /// parsed ASTs so repeated identical queries avoid re-parsing overhead.
390    ///
391    /// # Arguments
392    ///
393    /// * `sql` - Raw VelesQL query string
394    /// * `params` - Query parameters for resolving placeholders (e.g., `$v`)
395    ///
396    /// # Errors
397    ///
398    /// Returns a parse error if `sql` is invalid, or an execution error if the query fails.
399    pub fn execute_query_str(
400        &self,
401        sql: &str,
402        params: &std::collections::HashMap<String, serde_json::Value>,
403    ) -> Result<Vec<SearchResult>> {
404        let query = self
405            .query_cache
406            .parse(sql)
407            .map_err(|e| crate::error::Error::Query(e.to_string()))?;
408        self.execute_query(&query, params)
409    }
410
411    // NOTE: apply_distinct and compute_distinct_key moved to distinct.rs
412    // (EPIC-061/US-003 refactoring)
413
414    // NOTE: filter_by_similarity, execute_not_similarity_query, extract_not_similarity_condition,
415    // execute_scan_query moved to similarity_filter.rs (Plan 04-04)
416
417    // NOTE: execute_union_query, matches_metadata_filter, split_or_condition_with_outer_filter
418    // moved to union_query.rs (Plan 04-04)
419}