velesdb_core/collection/search/query/
mod.rs1#![allow(clippy::uninlined_format_args)] #![allow(clippy::implicit_hasher)] mod aggregation;
23pub(crate) mod condition_tree;
24mod distinct;
25#[cfg(test)]
26mod distinct_tests;
27mod execution_paths;
28mod extraction;
29#[cfg(test)]
30mod extraction_tests;
31mod hybrid_sparse;
32#[cfg(test)]
33mod hybrid_sparse_tests;
34pub mod join;
35#[cfg(test)]
36mod join_tests;
37pub mod match_exec;
38#[cfg(test)]
39mod match_exec_tests;
40pub mod match_metrics;
41#[cfg(test)]
42mod match_metrics_tests;
43pub mod match_planner;
44#[cfg(test)]
45mod match_planner_tests;
46mod multi_vector;
47#[cfg(test)]
48mod multi_vector_tests;
49mod ordering;
50#[cfg(test)]
51mod ordering_tests;
52pub mod parallel_traversal;
53#[cfg(test)]
54mod parallel_traversal_tests;
55pub mod projection;
56pub mod pushdown;
57#[cfg(test)]
58mod pushdown_tests;
59pub mod score_fusion;
60#[cfg(test)]
61mod score_fusion_tests;
62mod select_dispatch;
63pub(crate) mod set_operations;
64mod similarity_filter;
65mod sparse_dispatch;
66mod union_query;
67mod validation;
68mod where_eval;
69
70#[allow(unused_imports)]
72pub use ordering::compare_json_values;
73#[allow(unused_imports)]
75pub use join::{execute_join, JoinedResult};
76
77use crate::collection::types::Collection;
78use crate::error::Result;
79use crate::point::SearchResult;
80use std::collections::HashSet;
81
82const MAX_LIMIT: usize = 100_000;
84
85struct EarlyReturnCtx<'a> {
87 stmt: &'a crate::velesql::SelectStatement,
88 params: &'a std::collections::HashMap<String, serde_json::Value>,
89 cond: &'a crate::velesql::Condition,
90 has_graph_predicates: bool,
91 limit: usize,
92 ctx: &'a crate::guardrails::QueryContext,
93}
94
95struct ExtractedComponents {
97 vector_search: Option<Vec<f32>>,
98 similarity_conditions: Vec<(String, Vec<f32>, crate::velesql::CompareOp, f64)>,
99 filter_condition: Option<crate::velesql::Condition>,
100 graph_match_predicates: Vec<crate::velesql::GraphMatchPredicate>,
101 sparse_vector_search: Option<crate::velesql::SparseVectorSearch>,
102 is_union_query: bool,
103 is_not_similarity_query: bool,
104}
105
106impl Collection {
107 pub fn execute_query(
118 &self,
119 query: &crate::velesql::Query,
120 params: &std::collections::HashMap<String, serde_json::Value>,
121 ) -> Result<Vec<SearchResult>> {
122 let compound_limit = Some(u64::try_from(MAX_LIMIT).unwrap_or(u64::MAX));
127 let left_results = if query.compound.is_some() {
128 let mut left_query = query.clone();
129 left_query.select.limit = compound_limit;
130 left_query.compound = None;
131 self.execute_query_with_client(&left_query, params, "default")?
132 } else {
133 return self.execute_query_with_client(query, params, "default");
134 };
135
136 if let Some(ref compound) = query.compound {
138 let mut right_query = crate::velesql::Query::new_select(*compound.right.clone());
139 right_query.select.limit = compound_limit;
140 let right_results = self.execute_query_with_client(&right_query, params, "default")?;
141 let mut merged =
142 set_operations::apply_set_operation(left_results, right_results, compound.operator);
143 if let Some(limit) = query.select.limit {
145 merged.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
146 }
147 return Ok(merged);
148 }
149
150 Ok(left_results)
151 }
152
153 pub fn execute_query_with_client(
162 &self,
163 query: &crate::velesql::Query,
164 params: &std::collections::HashMap<String, serde_json::Value>,
165 client_id: &str,
166 ) -> Result<Vec<SearchResult>> {
167 self.guard_rails
169 .pre_check(client_id)
170 .map_err(crate::error::Error::from)?;
171
172 let ctx = self.guard_rails.create_context();
174
175 crate::velesql::QueryValidator::validate(query)
176 .map_err(|e| crate::error::Error::Query(e.to_string()))?;
177
178 if let Some(match_clause) = query.match_clause.as_ref() {
180 return self.dispatch_match_query(match_clause, params, &ctx);
181 }
182
183 let stmt = &query.select;
184 let limit = usize::try_from(stmt.limit.unwrap_or(10))
185 .unwrap_or(MAX_LIMIT)
186 .min(MAX_LIMIT);
187
188 let extracted = self.extract_query_components(stmt, params)?;
189
190 if let Some(results) = self.try_early_return_path(stmt, params, &extracted, limit, &ctx)? {
192 return Ok(results);
193 }
194
195 let mut results = self.dispatch_main_select(stmt, params, &extracted, limit, &ctx)?;
197
198 self.analyze_join_pushdown(stmt);
200
201 self.check_guardrails_and_record(&ctx, results.len())?;
203
204 results = self.apply_select_postprocessing(stmt, results, params, limit)?;
206
207 if extracted.vector_search.is_some() {
209 #[allow(clippy::cast_possible_truncation)]
211 let vector_latency_us = ctx.elapsed().as_micros() as u64;
212 self.query_planner
213 .stats()
214 .update_vector_latency(vector_latency_us);
215 }
216 self.guard_rails.circuit_breaker.record_success();
217 Ok(results)
218 }
219
220 fn extract_query_components(
222 &self,
223 stmt: &crate::velesql::SelectStatement,
224 params: &std::collections::HashMap<String, serde_json::Value>,
225 ) -> Result<ExtractedComponents> {
226 let mut vector_search = None;
227 let mut similarity_conditions = Vec::new();
228 let mut filter_condition = None;
229 let mut graph_match_predicates = Vec::new();
230 let mut sparse_vector_search = None;
231
232 let is_union_query = stmt
233 .where_clause
234 .as_ref()
235 .is_some_and(Self::has_similarity_in_problematic_or);
236 let is_not_similarity_query = stmt
237 .where_clause
238 .as_ref()
239 .is_some_and(Self::has_similarity_under_not);
240
241 if let Some(ref cond) = stmt.where_clause {
242 Self::validate_similarity_query_structure(cond)?;
243 Self::collect_graph_match_predicates(cond, &mut graph_match_predicates);
244 sparse_vector_search = Self::extract_sparse_vector_search(cond).cloned();
245
246 let mut extracted_cond = cond.clone();
247 vector_search = self.extract_vector_search(&mut extracted_cond, params)?;
248 similarity_conditions =
249 self.extract_all_similarity_conditions(&extracted_cond, params)?;
250 filter_condition = Some(extracted_cond);
251 }
252
253 Ok(ExtractedComponents {
254 vector_search,
255 similarity_conditions,
256 filter_condition,
257 graph_match_predicates,
258 sparse_vector_search,
259 is_union_query,
260 is_not_similarity_query,
261 })
262 }
263
264 fn try_early_return_path(
268 &self,
269 stmt: &crate::velesql::SelectStatement,
270 params: &std::collections::HashMap<String, serde_json::Value>,
271 extracted: &ExtractedComponents,
272 limit: usize,
273 ctx: &crate::guardrails::QueryContext,
274 ) -> Result<Option<Vec<SearchResult>>> {
275 if let Some(results) =
276 self.try_not_similarity_or_union(stmt, params, extracted, limit, ctx)?
277 {
278 return Ok(Some(results));
279 }
280
281 if let Some(ref svs) = extracted.sparse_vector_search {
283 let results = self.dispatch_sparse_query(stmt, params, extracted, svs, limit, ctx)?;
284 return Ok(Some(results));
285 }
286
287 Ok(None)
288 }
289
290 fn try_not_similarity_or_union(
292 &self,
293 stmt: &crate::velesql::SelectStatement,
294 params: &std::collections::HashMap<String, serde_json::Value>,
295 extracted: &ExtractedComponents,
296 limit: usize,
297 ctx: &crate::guardrails::QueryContext,
298 ) -> Result<Option<Vec<SearchResult>>> {
299 let cond = match stmt.where_clause.as_ref() {
300 Some(c) if extracted.is_not_similarity_query || extracted.is_union_query => c,
301 _ => return Ok(None),
302 };
303
304 let has_graph_predicates = !extracted.graph_match_predicates.is_empty();
305 let execution_limit = if has_graph_predicates {
306 MAX_LIMIT
307 } else {
308 limit
309 };
310
311 let early_ctx = EarlyReturnCtx {
312 stmt,
313 params,
314 cond,
315 has_graph_predicates,
316 limit,
317 ctx,
318 };
319
320 if extracted.is_not_similarity_query {
322 let results = self.execute_early_return_query(
323 |s| s.execute_not_similarity_query(cond, params, execution_limit),
324 &early_ctx,
325 )?;
326 return Ok(Some(results));
327 }
328
329 let results = self.execute_early_return_query(
331 |s| s.execute_union_query(cond, params, execution_limit),
332 &early_ctx,
333 )?;
334 Ok(Some(results))
335 }
336
337 fn execute_early_return_query(
339 &self,
340 execute_fn: impl FnOnce(&Self) -> Result<Vec<SearchResult>>,
341 early: &EarlyReturnCtx<'_>,
342 ) -> Result<Vec<SearchResult>> {
343 let mut results =
344 execute_fn(self).inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
345 if early.has_graph_predicates {
346 results = self
347 .apply_where_condition_to_results(
348 results,
349 early.cond,
350 early.params,
351 &early.stmt.from_alias,
352 )
353 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
354 }
355 if let Some(ref order_by) = early.stmt.order_by {
356 self.apply_order_by(&mut results, order_by, early.params)
357 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
358 }
359 results.truncate(early.limit);
360 self.check_guardrails_and_record(early.ctx, results.len())?;
361 self.guard_rails.circuit_breaker.record_success();
362 Ok(results)
363 }
364
365 fn check_guardrails_and_record(
373 &self,
374 ctx: &crate::guardrails::QueryContext,
375 result_count: usize,
376 ) -> Result<()> {
377 ctx.check_timeout()
378 .map_err(crate::error::Error::from)
379 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
380 ctx.check_cardinality(result_count)
381 .map_err(crate::error::Error::from)
382 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
383 Ok(())
384 }
385
386 pub fn execute_query_str(
400 &self,
401 sql: &str,
402 params: &std::collections::HashMap<String, serde_json::Value>,
403 ) -> Result<Vec<SearchResult>> {
404 let query = self
405 .query_cache
406 .parse(sql)
407 .map_err(|e| crate::error::Error::Query(e.to_string()))?;
408 self.execute_query(&query, params)
409 }
410
411 }