velesdb_core/collection/search/query/
mod.rs1#![allow(clippy::uninlined_format_args)] #![allow(clippy::implicit_hasher)] mod aggregation;
23pub(crate) mod condition_tree;
24mod distinct;
25#[cfg(test)]
26mod distinct_tests;
27mod execution_paths;
28mod extraction;
29#[cfg(test)]
30mod extraction_tests;
31mod hybrid_sparse;
32#[cfg(test)]
33mod hybrid_sparse_tests;
34pub mod join;
35#[cfg(test)]
36mod join_tests;
37pub mod match_exec;
38#[cfg(test)]
39mod match_exec_tests;
40pub mod match_metrics;
41#[cfg(test)]
42mod match_metrics_tests;
43pub mod match_planner;
44#[cfg(test)]
45mod match_planner_tests;
46mod multi_vector;
47#[cfg(test)]
48mod multi_vector_tests;
49mod ordering;
50#[cfg(test)]
51mod ordering_tests;
52pub mod parallel_traversal;
53#[cfg(test)]
54mod parallel_traversal_tests;
55pub mod projection;
56pub mod pushdown;
57#[cfg(test)]
58mod pushdown_tests;
59pub mod score_fusion;
60#[cfg(test)]
61mod score_fusion_tests;
62mod select_dispatch;
63pub(crate) mod set_operations;
64mod similarity_filter;
65mod sparse_dispatch;
66mod union_query;
67mod validation;
68mod where_eval;
69
70#[allow(unused_imports)]
72pub use ordering::compare_json_values;
73#[allow(unused_imports)]
75pub use join::{execute_join, JoinedResult};
76
77use crate::collection::types::Collection;
78use crate::error::Result;
79use crate::point::SearchResult;
80use std::collections::HashSet;
81
82const MAX_LIMIT: usize = 100_000;
84
85struct EarlyReturnCtx<'a> {
87 stmt: &'a crate::velesql::SelectStatement,
88 params: &'a std::collections::HashMap<String, serde_json::Value>,
89 cond: &'a crate::velesql::Condition,
90 has_graph_predicates: bool,
91 limit: usize,
92 ctx: &'a crate::guardrails::QueryContext,
93}
94
95struct ExtractedComponents {
97 vector_search: Option<Vec<f32>>,
98 similarity_conditions: Vec<(String, Vec<f32>, crate::velesql::CompareOp, f64)>,
99 filter_condition: Option<crate::velesql::Condition>,
100 graph_match_predicates: Vec<crate::velesql::GraphMatchPredicate>,
101 sparse_vector_search: Option<crate::velesql::SparseVectorSearch>,
102 is_union_query: bool,
103 is_not_similarity_query: bool,
104}
105
106impl Collection {
107 pub fn execute_query(
118 &self,
119 query: &crate::velesql::Query,
120 params: &std::collections::HashMap<String, serde_json::Value>,
121 ) -> Result<Vec<SearchResult>> {
122 let compound_limit = Some(u64::try_from(MAX_LIMIT).unwrap_or(u64::MAX));
127 let left_results = if query.compound.is_some() {
128 let mut left_query = query.clone();
129 left_query.select.limit = compound_limit;
130 left_query.compound = None;
131 self.execute_query_with_client(&left_query, params, "default")?
132 } else {
133 return self.execute_query_with_client(query, params, "default");
134 };
135
136 if let Some(ref compound) = query.compound {
138 let mut accumulated = left_results;
139 for (operator, right_select) in &compound.operations {
140 let mut right_query = crate::velesql::Query::new_select(right_select.clone());
141 right_query.select.limit = compound_limit;
142 let right_results =
143 self.execute_query_with_client(&right_query, params, "default")?;
144 accumulated =
145 set_operations::apply_set_operation(accumulated, right_results, *operator);
146 }
147 if let Some(limit) = query.select.limit {
149 accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
150 }
151 return Ok(accumulated);
152 }
153
154 Ok(left_results)
155 }
156
157 pub fn execute_query_with_client(
166 &self,
167 query: &crate::velesql::Query,
168 params: &std::collections::HashMap<String, serde_json::Value>,
169 client_id: &str,
170 ) -> Result<Vec<SearchResult>> {
171 self.guard_rails
173 .pre_check(client_id)
174 .map_err(crate::error::Error::from)?;
175
176 let ctx = self.guard_rails.create_context();
178
179 crate::velesql::QueryValidator::validate(query)
180 .map_err(|e| crate::error::Error::Query(e.to_string()))?;
181
182 if let Some(match_clause) = query.match_clause.as_ref() {
184 return self.dispatch_match_query(match_clause, params, &ctx);
185 }
186
187 let stmt = &query.select;
188 let limit = usize::try_from(stmt.limit.unwrap_or(10))
189 .unwrap_or(MAX_LIMIT)
190 .min(MAX_LIMIT);
191
192 let extracted = self.extract_query_components(stmt, params)?;
193
194 if let Some(results) = self.try_early_return_path(stmt, params, &extracted, limit, &ctx)? {
196 return Ok(results);
197 }
198
199 let mut results = self.dispatch_main_select(stmt, params, &extracted, limit, &ctx)?;
201
202 self.analyze_join_pushdown(stmt);
204
205 self.check_guardrails_and_record(&ctx, results.len())?;
207
208 results = self.apply_select_postprocessing(stmt, results, params, limit)?;
210
211 if extracted.vector_search.is_some() {
213 #[allow(clippy::cast_possible_truncation)]
215 let vector_latency_us = ctx.elapsed().as_micros() as u64;
216 self.query_planner
217 .stats()
218 .update_vector_latency(vector_latency_us);
219 }
220 self.guard_rails.circuit_breaker.record_success();
221 Ok(results)
222 }
223
224 fn extract_query_components(
226 &self,
227 stmt: &crate::velesql::SelectStatement,
228 params: &std::collections::HashMap<String, serde_json::Value>,
229 ) -> Result<ExtractedComponents> {
230 let mut vector_search = None;
231 let mut similarity_conditions = Vec::new();
232 let mut filter_condition = None;
233 let mut graph_match_predicates = Vec::new();
234 let mut sparse_vector_search = None;
235
236 let is_union_query = stmt
237 .where_clause
238 .as_ref()
239 .is_some_and(Self::has_similarity_in_problematic_or);
240 let is_not_similarity_query = stmt
241 .where_clause
242 .as_ref()
243 .is_some_and(Self::has_similarity_under_not);
244
245 if let Some(ref cond) = stmt.where_clause {
246 Self::validate_similarity_query_structure(cond)?;
247 Self::collect_graph_match_predicates(cond, &mut graph_match_predicates);
248 sparse_vector_search = Self::extract_sparse_vector_search(cond).cloned();
249
250 let mut extracted_cond = cond.clone();
251 vector_search = self.extract_vector_search(&mut extracted_cond, params)?;
252 similarity_conditions =
253 self.extract_all_similarity_conditions(&extracted_cond, params)?;
254 filter_condition = Some(extracted_cond);
255 }
256
257 Ok(ExtractedComponents {
258 vector_search,
259 similarity_conditions,
260 filter_condition,
261 graph_match_predicates,
262 sparse_vector_search,
263 is_union_query,
264 is_not_similarity_query,
265 })
266 }
267
268 fn try_early_return_path(
272 &self,
273 stmt: &crate::velesql::SelectStatement,
274 params: &std::collections::HashMap<String, serde_json::Value>,
275 extracted: &ExtractedComponents,
276 limit: usize,
277 ctx: &crate::guardrails::QueryContext,
278 ) -> Result<Option<Vec<SearchResult>>> {
279 if let Some(results) =
280 self.try_not_similarity_or_union(stmt, params, extracted, limit, ctx)?
281 {
282 return Ok(Some(results));
283 }
284
285 if let Some(ref svs) = extracted.sparse_vector_search {
287 let results = self.dispatch_sparse_query(stmt, params, extracted, svs, limit, ctx)?;
288 return Ok(Some(results));
289 }
290
291 Ok(None)
292 }
293
294 fn try_not_similarity_or_union(
296 &self,
297 stmt: &crate::velesql::SelectStatement,
298 params: &std::collections::HashMap<String, serde_json::Value>,
299 extracted: &ExtractedComponents,
300 limit: usize,
301 ctx: &crate::guardrails::QueryContext,
302 ) -> Result<Option<Vec<SearchResult>>> {
303 let cond = match stmt.where_clause.as_ref() {
304 Some(c) if extracted.is_not_similarity_query || extracted.is_union_query => c,
305 _ => return Ok(None),
306 };
307
308 let has_graph_predicates = !extracted.graph_match_predicates.is_empty();
309 let execution_limit = if has_graph_predicates {
310 MAX_LIMIT
311 } else {
312 limit
313 };
314
315 let early_ctx = EarlyReturnCtx {
316 stmt,
317 params,
318 cond,
319 has_graph_predicates,
320 limit,
321 ctx,
322 };
323
324 if extracted.is_not_similarity_query {
326 let results = self.execute_early_return_query(
327 |s| s.execute_not_similarity_query(cond, params, execution_limit),
328 &early_ctx,
329 )?;
330 return Ok(Some(results));
331 }
332
333 let results = self.execute_early_return_query(
335 |s| s.execute_union_query(cond, params, execution_limit),
336 &early_ctx,
337 )?;
338 Ok(Some(results))
339 }
340
341 fn execute_early_return_query(
343 &self,
344 execute_fn: impl FnOnce(&Self) -> Result<Vec<SearchResult>>,
345 early: &EarlyReturnCtx<'_>,
346 ) -> Result<Vec<SearchResult>> {
347 let mut results =
348 execute_fn(self).inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
349 if early.has_graph_predicates {
350 results = self
351 .apply_where_condition_to_results(
352 results,
353 early.cond,
354 early.params,
355 &early.stmt.from_alias,
356 )
357 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
358 }
359 if let Some(ref order_by) = early.stmt.order_by {
360 self.apply_order_by(&mut results, order_by, early.params)
361 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
362 }
363 results.truncate(early.limit);
364 self.check_guardrails_and_record(early.ctx, results.len())?;
365 self.guard_rails.circuit_breaker.record_success();
366 Ok(results)
367 }
368
369 fn check_guardrails_and_record(
377 &self,
378 ctx: &crate::guardrails::QueryContext,
379 result_count: usize,
380 ) -> Result<()> {
381 ctx.check_timeout()
382 .map_err(crate::error::Error::from)
383 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
384 ctx.check_cardinality(result_count)
385 .map_err(crate::error::Error::from)
386 .inspect_err(|_| self.guard_rails.circuit_breaker.record_failure())?;
387 Ok(())
388 }
389
390 pub fn execute_query_str(
404 &self,
405 sql: &str,
406 params: &std::collections::HashMap<String, serde_json::Value>,
407 ) -> Result<Vec<SearchResult>> {
408 let query = self
409 .query_cache
410 .parse(sql)
411 .map_err(|e| crate::error::Error::Query(e.to_string()))?;
412 self.execute_query(&query, params)
413 }
414
415 }