velesdb_core/collection/search/query/
mod.rs1#![allow(clippy::uninlined_format_args)] #![allow(clippy::implicit_hasher)] mod aggregation;
23mod distinct;
24mod execution_paths;
25mod extraction;
26#[cfg(test)]
27mod extraction_tests;
28mod hybrid_sparse;
29#[cfg(test)]
30mod hybrid_sparse_tests;
31pub mod join;
32#[cfg(test)]
33mod join_tests;
34pub mod match_exec;
35#[cfg(test)]
36mod match_exec_tests;
37pub mod match_metrics;
38#[cfg(test)]
39mod match_metrics_tests;
40pub mod match_planner;
41#[cfg(test)]
42mod match_planner_tests;
43mod multi_vector;
44mod ordering;
45pub mod parallel_traversal;
46#[cfg(test)]
47mod parallel_traversal_tests;
48pub mod pushdown;
49#[cfg(test)]
50mod pushdown_tests;
51pub mod score_fusion;
52#[cfg(test)]
53mod score_fusion_tests;
54mod similarity_filter;
55mod union_query;
56mod validation;
57mod where_eval;
58
59#[allow(unused_imports)]
61pub use ordering::compare_json_values;
62#[allow(unused_imports)]
64pub use join::{execute_join, JoinedResult};
65
66use crate::collection::types::Collection;
67use crate::error::Result;
68use crate::point::SearchResult;
69use std::collections::HashSet;
70
71const MAX_LIMIT: usize = 100_000;
73
74impl Collection {
75 pub fn execute_query(
85 &self,
86 query: &crate::velesql::Query,
87 params: &std::collections::HashMap<String, serde_json::Value>,
88 ) -> Result<Vec<SearchResult>> {
89 self.execute_query_with_client(query, params, "default")
90 }
91
92 #[allow(clippy::too_many_lines)] pub fn execute_query_with_client(
102 &self,
103 query: &crate::velesql::Query,
104 params: &std::collections::HashMap<String, serde_json::Value>,
105 client_id: &str,
106 ) -> Result<Vec<SearchResult>> {
107 self.guard_rails
109 .pre_check(client_id)
110 .map_err(crate::error::Error::from)?;
111
112 let ctx = self.guard_rails.create_context();
114
115 crate::velesql::QueryValidator::validate(query)
116 .map_err(|e| crate::error::Error::Query(e.to_string()))?;
117
118 if let Some(match_clause) = query.match_clause.as_ref() {
120 let match_results =
121 self.execute_match_with_context(match_clause, params, Some(&ctx))?;
122
123 ctx.check_timeout()
125 .map_err(crate::error::Error::from)
126 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
127
128 let mut sorted = match_results;
129 if let Some(order_by) = match_clause.return_clause.order_by.as_ref() {
130 for item in order_by.iter().rev() {
131 self.order_match_results(&mut sorted, &item.expression, item.descending);
132 }
133 }
134
135 let mut results = self
136 .match_results_to_search_results(sorted)
137 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
138 ctx.check_cardinality(results.len())
144 .map_err(crate::error::Error::from)
145 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
146 if let Some(limit) = match_clause.return_clause.limit {
147 let limit = usize::try_from(limit).unwrap_or(MAX_LIMIT).min(MAX_LIMIT);
148 results.truncate(limit);
149 }
150 #[allow(clippy::cast_possible_truncation)]
153 let graph_latency_us = ctx.elapsed().as_micros() as u64;
154 self.query_planner
155 .stats()
156 .update_graph_latency(graph_latency_us);
157 self.guard_rails.circuit_breaker.record_success();
158 return Ok(results);
159 }
160
161 let stmt = &query.select;
162 let limit = usize::try_from(stmt.limit.unwrap_or(10))
164 .unwrap_or(MAX_LIMIT)
165 .min(MAX_LIMIT);
166
167 let mut vector_search = None;
169 let mut similarity_conditions: Vec<(String, Vec<f32>, crate::velesql::CompareOp, f64)> =
170 Vec::new();
171 let mut filter_condition = None;
172 let mut graph_match_predicates = Vec::new();
173
174 let is_union_query = if let Some(ref cond) = stmt.where_clause {
176 Self::has_similarity_in_problematic_or(cond)
177 } else {
178 false
179 };
180
181 let is_not_similarity_query = if let Some(ref cond) = stmt.where_clause {
183 Self::has_similarity_under_not(cond)
184 } else {
185 false
186 };
187
188 let mut sparse_vector_search = None;
190
191 if let Some(ref cond) = stmt.where_clause {
192 Self::validate_similarity_query_structure(cond)?;
194 Self::collect_graph_match_predicates(cond, &mut graph_match_predicates);
195
196 sparse_vector_search = Self::extract_sparse_vector_search(cond).cloned();
198
199 let mut extracted_cond = cond.clone();
203 vector_search = self.extract_vector_search(&mut extracted_cond, params)?;
204 similarity_conditions =
206 self.extract_all_similarity_conditions(&extracted_cond, params)?;
207 filter_condition = Some(extracted_cond);
208
209 }
212
213 let mut ef_search = None;
215 if let Some(ref with) = stmt.with_clause {
216 ef_search = with.get_ef_search();
217 }
218
219 let first_similarity = similarity_conditions.first().cloned();
221 let has_graph_predicates = !graph_match_predicates.is_empty();
222 let skip_metadata_prefilter_for_graph_or = has_graph_predicates
223 && stmt
224 .where_clause
225 .as_ref()
226 .is_some_and(Self::condition_contains_or);
227 let execution_limit = if has_graph_predicates {
228 MAX_LIMIT
229 } else {
230 limit
231 };
232
233 let (cbo_strategy, cbo_over_fetch) = {
238 let col_stats = self.get_stats();
239 self.query_planner.choose_strategy_with_cbo_and_overfetch(
240 &col_stats,
241 filter_condition.as_ref(),
242 limit,
243 )
244 };
245 tracing::debug!(
246 strategy = ?cbo_strategy,
247 over_fetch = cbo_over_fetch,
248 "CBO selected execution strategy"
249 );
250
251 if is_not_similarity_query {
254 if let Some(ref cond) = stmt.where_clause {
255 let mut results = self
256 .execute_not_similarity_query(cond, params, execution_limit)
257 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
258 if has_graph_predicates {
259 results = self
260 .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
261 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
262 }
263
264 if let Some(ref order_by) = stmt.order_by {
266 self.apply_order_by(&mut results, order_by, params)
267 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
268 }
269 results.truncate(limit);
270 ctx.check_timeout()
274 .map_err(crate::error::Error::from)
275 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
276 ctx.check_cardinality(results.len())
277 .map_err(crate::error::Error::from)
278 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
279 self.guard_rails.circuit_breaker.record_success();
280 return Ok(results);
281 }
282 }
283
284 if is_union_query {
286 if let Some(ref cond) = stmt.where_clause {
287 let mut results = self
288 .execute_union_query(cond, params, execution_limit)
289 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
290 if has_graph_predicates {
291 results = self
292 .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
293 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
294 }
295
296 if let Some(ref order_by) = stmt.order_by {
298 self.apply_order_by(&mut results, order_by, params)
299 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
300 }
301 results.truncate(limit);
302 ctx.check_timeout()
305 .map_err(crate::error::Error::from)
306 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
307 ctx.check_cardinality(results.len())
308 .map_err(crate::error::Error::from)
309 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
310 self.guard_rails.circuit_breaker.record_success();
311 return Ok(results);
312 }
313 }
314
315 if let Some(ref svs) = sparse_vector_search {
317 let mut results = if let Some(ref dense_vec) = vector_search {
318 let fusion_strategy = stmt.fusion_clause.as_ref().map_or_else(
320 crate::fusion::FusionStrategy::rrf_default,
321 |fc| {
322 use crate::velesql::FusionStrategyType;
323 match fc.strategy {
324 FusionStrategyType::Rsf => {
325 let dw = fc.dense_weight.unwrap_or(0.5);
326 let sw = fc.sparse_weight.unwrap_or(0.5);
327 crate::fusion::FusionStrategy::relative_score(dw, sw)
328 .unwrap_or_else(|_| {
329 crate::fusion::FusionStrategy::rrf_default()
330 })
331 }
332 FusionStrategyType::Rrf => crate::fusion::FusionStrategy::RRF {
333 k: fc.k.unwrap_or(60),
334 },
335 _ => crate::fusion::FusionStrategy::rrf_default(),
336 }
337 },
338 );
339 self.execute_hybrid_search_with_strategy(
340 dense_vec,
341 svs,
342 params,
343 filter_condition.as_ref(),
344 execution_limit,
345 &fusion_strategy,
346 )
347 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?
348 } else {
349 self.execute_sparse_search(svs, params, filter_condition.as_ref(), execution_limit)
351 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?
352 };
353
354 if has_graph_predicates {
355 if let Some(cond) = stmt.where_clause.as_ref() {
356 results = self
357 .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
358 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
359 }
360 }
361
362 ctx.check_timeout()
363 .map_err(crate::error::Error::from)
364 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
365 ctx.check_cardinality(results.len())
366 .map_err(crate::error::Error::from)
367 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
368
369 if stmt.distinct == crate::velesql::DistinctMode::All {
370 results = distinct::apply_distinct(results, &stmt.columns);
371 }
372 if let Some(ref order_by) = stmt.order_by {
373 self.apply_order_by(&mut results, order_by, params)?;
374 }
375 results.truncate(limit);
376 self.guard_rails.circuit_breaker.record_success();
377 return Ok(results);
378 }
379
380 let mut results = self
383 .dispatch_vector_query(
384 vector_search.as_ref(),
385 first_similarity.as_ref(),
386 &similarity_conditions,
387 filter_condition.as_ref(),
388 execution_limit,
389 skip_metadata_prefilter_for_graph_or,
390 ef_search,
391 cbo_strategy,
392 cbo_over_fetch,
393 )
394 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
395
396 if has_graph_predicates {
397 if let Some(cond) = stmt.where_clause.as_ref() {
398 results = self
399 .apply_where_condition_to_results(results, cond, params, &stmt.from_alias)
400 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
401 }
402 }
403
404 if !stmt.joins.is_empty() {
414 if let Some(ref cond) = stmt.where_clause {
415 let graph_vars: std::collections::HashSet<String> =
418 stmt.from_alias.iter().cloned().collect();
419 let join_tables = pushdown::extract_join_tables(&stmt.joins);
420 let analysis = pushdown::analyze_for_pushdown(cond, &graph_vars, &join_tables);
421 tracing::debug!(
422 column_store_filters = analysis.column_store_filters.len(),
423 graph_filters = analysis.graph_filters.len(),
424 post_join_filters = analysis.post_join_filters.len(),
425 has_pushdown = analysis.has_pushdown(),
426 "JOIN pushdown analysis complete"
427 );
428 }
429 }
430
431 ctx.check_timeout()
433 .map_err(crate::error::Error::from)
434 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
435
436 ctx.check_cardinality(results.len())
440 .map_err(crate::error::Error::from)
441 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
442
443 if stmt.distinct == crate::velesql::DistinctMode::All {
445 results = distinct::apply_distinct(results, &stmt.columns);
446 }
447
448 if let Some(ref order_by) = stmt.order_by {
450 self.apply_order_by(&mut results, order_by, params)?;
451 }
452
453 results.truncate(limit);
455
456 if vector_search.is_some() {
459 #[allow(clippy::cast_possible_truncation)]
461 let vector_latency_us = ctx.elapsed().as_micros() as u64;
462 self.query_planner
463 .stats()
464 .update_vector_latency(vector_latency_us);
465 }
466 self.guard_rails.circuit_breaker.record_success();
467 Ok(results)
468 }
469
470 pub fn execute_query_str(
484 &self,
485 sql: &str,
486 params: &std::collections::HashMap<String, serde_json::Value>,
487 ) -> Result<Vec<SearchResult>> {
488 let query = self
489 .query_cache
490 .parse(sql)
491 .map_err(|e| crate::error::Error::Query(e.to_string()))?;
492 self.execute_query(&query, params)
493 }
494
495 }