velesdb_core/database/query_engine.rs
1//! Query execution: `execute_query`, `explain_query`, `explain_analyze_query`, plan caching, and DML dispatch.
2
3use crate::velesql::{
4 ActualStats, AdminStatement, DdlStatement, DmlStatement, ExplainOutput, IntrospectionStatement,
5 Query, TrainStatement,
6};
7use crate::{Error, Result, SearchResult};
8
9use super::Database;
10
11/// Statement type classification for dispatch routing.
12enum StatementType<'a> {
13 Admin(&'a AdminStatement),
14 Introspection(&'a IntrospectionStatement),
15 Ddl(&'a DdlStatement),
16 Train(&'a TrainStatement),
17 Dml(&'a DmlStatement),
18 Match,
19 Select,
20}
21
22/// Classifies a query into its statement type for routing.
23fn classify_statement(query: &Query) -> StatementType<'_> {
24 if let Some(admin) = query.admin.as_ref() {
25 return StatementType::Admin(admin);
26 }
27 if let Some(intro) = query.introspection.as_ref() {
28 return StatementType::Introspection(intro);
29 }
30 if let Some(ddl) = query.ddl.as_ref() {
31 return StatementType::Ddl(ddl);
32 }
33 if let Some(train) = query.train.as_ref() {
34 return StatementType::Train(train);
35 }
36 if let Some(dml) = query.dml.as_ref() {
37 return StatementType::Dml(dml);
38 }
39 if query.is_match_query() {
40 return StatementType::Match;
41 }
42 StatementType::Select
43}
44
45impl Database {
46 /// Produces a canonical JSON string for a `serde_json::Value`.
47 ///
48 /// Recursively sorts the keys of every JSON object so that two values
49 /// representing the same logical structure always produce identical bytes,
50 /// regardless of the `HashMap` iteration order used during serialization.
51 ///
52 /// This is required because `FusionConfig::params` and
53 /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
54 /// them in hash-order, which is non-deterministic across invocations.
55 fn canonical_json(value: serde_json::Value) -> serde_json::Value {
56 match value {
57 serde_json::Value::Object(map) => {
58 // Without the `preserve_order` feature flag, `serde_json::Map` is already
59 // backed by `BTreeMap` and therefore already sorted. This explicit sort
60 // step is kept as defense-in-depth: if `preserve_order` is ever enabled
61 // in `Cargo.toml` (which switches the backing store to `IndexMap` and
62 // preserves insertion order), the canonical key ordering is still upheld
63 // without any change to this function.
64 let sorted: serde_json::Map<String, serde_json::Value> = map
65 .into_iter()
66 .map(|(k, v)| (k, Self::canonical_json(v)))
67 .collect::<std::collections::BTreeMap<_, _>>()
68 .into_iter()
69 .collect();
70 serde_json::Value::Object(sorted)
71 }
72 serde_json::Value::Array(arr) => {
73 serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
74 }
75 other => other,
76 }
77 }
78
79 /// Builds a deterministic cache key for a query (CACHE-02).
80 ///
81 /// Serialises the query to canonical JSON (object keys sorted recursively),
82 /// reads the current `schema_version`, and gathers per-collection
83 /// `write_generation` counters (sorted by collection name) to form a
84 /// `PlanKey`.
85 ///
86 /// # Why canonical JSON instead of `Debug`
87 ///
88 /// `format!("{query:?}")` is non-deterministic when the `Query` AST
89 /// contains `HashMap`-backed fields (`FusionConfig::params`,
90 /// `TrainStatement::params`) because `HashMap` iteration order is not
91 /// guaranteed across invocations. Canonical JSON with sorted object keys
92 /// is stable and produces the same byte sequence for logically identical
93 /// queries.
94 #[must_use]
95 pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
96 use std::hash::{BuildHasher, Hasher};
97
98 // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
99 // Fallback to Debug representation if serialization fails (should never happen in
100 // practice since all Query fields are Serialize, but erring on the side of liveness).
101 let query_text = serde_json::to_value(query)
102 .map(Self::canonical_json)
103 .and_then(|v| serde_json::to_string(&v))
104 .unwrap_or_else(|_| format!("{query:?}"));
105
106 let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
107 hasher.write(query_text.as_bytes());
108 let query_hash = hasher.finish();
109
110 let schema_version = self.schema_version();
111 let collection_names = Self::referenced_collection_names(query);
112
113 // Build generations vector in sorted collection order.
114 let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
115 .iter()
116 .map(|name| self.collection_write_generation(name).unwrap_or(0))
117 .collect();
118
119 // Issue #608: parallel vector of analyze generations so that running
120 // ANALYZE alone (no data mutation) still flips the cache key and
121 // rebuilds plans with the fresh calibrated cost estimates.
122 let analyze_generations: smallvec::SmallVec<[u64; 4]> = collection_names
123 .iter()
124 .map(|name| self.collection_analyze_generation(name).unwrap_or(0))
125 .collect();
126
127 crate::cache::PlanKey {
128 // Issue #902: store the canonical text so PlanKey equality is
129 // collision-safe. query_hash stays a Hash accelerator only.
130 query_text: query_text.into(),
131 query_hash,
132 schema_version,
133 collection_generations,
134 analyze_generations,
135 }
136 }
137
138 /// Returns the query plan for a query, with cache status populated (CACHE-02).
139 ///
140 /// If the plan is cached, returns it with `cache_hit: Some(true)` and
141 /// `plan_reuse_count` set. Otherwise generates a fresh plan with
142 /// `cache_hit: Some(false)`.
143 ///
144 /// # Design decision: `explain_query` does not populate the cache
145 ///
146 /// `explain_query` intentionally does **not** insert a new plan into the
147 /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
148 /// influence cache state would make cache metrics (hit/miss ratios,
149 /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
150 /// indistinguishable from real execution hits. Only `execute_query` is
151 /// authorised to write to the cache.
152 ///
153 /// # Errors
154 ///
155 /// Returns an error if the query is invalid.
156 pub fn explain_query(
157 &self,
158 query: &crate::velesql::Query,
159 ) -> Result<crate::velesql::QueryPlan> {
160 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
161
162 let plan_key = self.build_plan_key(query);
163
164 if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
165 let mut plan = cached.plan.clone();
166 plan.cache_hit = Some(true);
167 plan.plan_reuse_count = Some(
168 cached
169 .reuse_count
170 .load(std::sync::atomic::Ordering::Relaxed),
171 );
172 return Ok(plan);
173 }
174
175 let mut plan = self.build_plan_with_stats(query);
176 plan.cache_hit = Some(false);
177 plan.plan_reuse_count = Some(0);
178 Ok(plan)
179 }
180
181 /// Builds a query plan, resolving calibrated collection statistics AND
182 /// the registered secondary index set from the registry when available
183 /// (#471 — EXPLAIN real costs, #607 — `IndexLookup` wiring).
184 ///
185 /// The returned plan's `estimated_cost_ms` and `filter_strategy` are
186 /// calibrated via `CostEstimator` when stats exist for the query's
187 /// primary collection. Falls back to heuristics otherwise. The
188 /// `indexed_fields` argument is populated from
189 /// `Database::indexed_fields_for` so that `IndexLookup` nodes appear
190 /// in the EXPLAIN tree for WHERE clauses targeting indexed columns.
191 fn build_plan_with_stats(&self, query: &crate::velesql::Query) -> crate::velesql::QueryPlan {
192 let primary = &query.select.from;
193 let core_stats = self.get_collection_stats(primary).ok().flatten();
194 let indexed = self.indexed_fields_for(primary);
195 crate::velesql::QueryPlan::from_query_with_stats(query, &indexed, core_stats.as_ref())
196 }
197
198 /// Executes a query with instrumentation and returns both plan and actual stats.
199 ///
200 /// Unlike `explain_query` (plan only) and `execute_query` (results only),
201 /// this method returns the full [`ExplainOutput`] with measured statistics.
202 /// The normal `execute_query` path is untouched — zero overhead on
203 /// non-ANALYZE queries.
204 ///
205 /// # Errors
206 ///
207 /// Returns an error if the query is invalid or execution fails.
208 pub fn explain_analyze_query(
209 &self,
210 query: &Query,
211 params: &std::collections::HashMap<String, serde_json::Value>,
212 ) -> Result<ExplainOutput> {
213 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
214
215 let plan = self.explain_query(query)?;
216 let start = std::time::Instant::now();
217 let results = self.execute_query(query, params)?;
218 let elapsed = start.elapsed();
219
220 let actual_rows = results.len() as u64;
221 let actual_time_ms = elapsed.as_secs_f64() * 1000.0;
222 let is_match = query.is_match_query();
223 let (nodes_visited, edges_traversed) = if is_match {
224 (actual_rows, actual_rows)
225 } else {
226 (0, 0)
227 };
228
229 let stats = ActualStats {
230 actual_rows,
231 actual_time_ms,
232 loops: 1,
233 nodes_visited,
234 edges_traversed,
235 };
236
237 let node_stats =
238 crate::velesql::build_leaf_node_stats(&plan.root, actual_rows, actual_time_ms);
239 Ok(ExplainOutput::with_stats(plan, stats, node_stats))
240 }
241
242 /// Executes a `VelesQL` query with database-level JOIN resolution.
243 ///
244 /// This method resolves JOIN target collections from the database registry
245 /// and executes JOIN runtime in sequence. Query plans are cached and
246 /// reused for identical queries against unchanged collections (CACHE-02).
247 ///
248 /// # Errors
249 ///
250 /// Returns an error if the base collection or any JOIN collection is missing.
251 pub fn execute_query(
252 &self,
253 query: &crate::velesql::Query,
254 params: &std::collections::HashMap<String, serde_json::Value>,
255 ) -> Result<Vec<SearchResult>> {
256 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
257
258 if let Some(results) = self.dispatch_non_select(query, params)? {
259 return Ok(results);
260 }
261
262 // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
263 //
264 // `contains()` is used instead of `get().is_some()` so that this
265 // existence check does not increment the hit/miss counters or
266 // `reuse_count`. Only `explain_query` (which surfaces these values to
267 // callers) should call `get()`.
268 let pre_exec_key = self.build_plan_key(query);
269 let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
270
271 let results = self.execute_select_query(query, params)?;
272
273 // Populate cache on miss (CACHE-02).
274 //
275 // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
276 // pre-execution `contains()` check and here, a concurrent writer may
277 // have bumped a collection's `write_generation` (e.g. via `upsert` on
278 // another thread). Rebuilding the key captures the post-execution
279 // state, so the cached plan is associated with the generation that was
280 // live when the plan was actually compiled — not a potentially stale
281 // pre-execution snapshot.
282 if !is_cached {
283 self.populate_plan_cache(query);
284 }
285
286 Ok(results)
287 }
288
289 /// Classifies and dispatches non-SELECT statement types.
290 ///
291 /// Returns `Ok(Some(results))` if handled, `Ok(None)` for SELECT queries.
292 fn dispatch_non_select(
293 &self,
294 query: &crate::velesql::Query,
295 params: &std::collections::HashMap<String, serde_json::Value>,
296 ) -> Result<Option<Vec<SearchResult>>> {
297 // Classify the statement type (at most one is Some).
298 let stmt_type = classify_statement(query);
299 match stmt_type {
300 StatementType::Admin(admin) => Ok(Some(self.execute_admin(admin)?)),
301 StatementType::Introspection(intro) => Ok(Some(self.execute_introspection(intro)?)),
302 StatementType::Ddl(ddl) => Ok(Some(self.execute_ddl(ddl)?)),
303 StatementType::Train(train) => Ok(Some(self.execute_train(train)?)),
304 StatementType::Dml(dml) => Ok(Some(self.execute_dml(dml, params)?)),
305 StatementType::Match => {
306 // Route MATCH queries to the target collection.
307 // Resolution order:
308 // 1. select.from (e.g. "SELECT * FROM kg WHERE MATCH ...")
309 // 2. "_collection" key in params (programmatic API)
310 // 3. Error with guidance
311 let collection_name = if !query.select.from.is_empty() {
312 query.select.from.clone()
313 } else if let Some(serde_json::Value::String(name)) = params.get("_collection") {
314 name.clone()
315 } else {
316 return Err(Error::Query(
317 "MATCH query requires a target collection. Either use \
318 SELECT ... FROM <collection> WHERE MATCH ..., or pass \
319 {\"_collection\": \"name\"} in params."
320 .to_string(),
321 ));
322 };
323 let coll = self.resolve_collection(&collection_name)?;
324 let mut results = coll.execute_query(query, params)?;
325
326 // Cross-collection enrichment: if any node pattern has a
327 // @collection annotation, look up payloads from those
328 // collections and merge into the projected fields.
329 if let Some(mc) = &query.match_clause {
330 self.enrich_match_results_cross_collection(mc, &mut results);
331 }
332
333 Ok(Some(results))
334 }
335 StatementType::Select => Ok(None),
336 }
337 }
338
339 /// Executes the SELECT portion of a query, resolving JOINs if present.
340 fn execute_select_query(
341 &self,
342 query: &crate::velesql::Query,
343 params: &std::collections::HashMap<String, serde_json::Value>,
344 ) -> Result<Vec<SearchResult>> {
345 // EPIC-040 US-006: For compound queries, strip LIMIT from each operand so
346 // the set operation sees the full result sets. The final LIMIT is applied
347 // once on the merged output (SQL-standard behaviour).
348 // Use MAX_LIMIT (not None) to avoid the default-10 cap downstream.
349 const COMPOUND_LIMIT: usize = 100_000;
350 let compound_limit = Some(COMPOUND_LIMIT as u64); // 100_000 fits u64 exactly.
351 let left_results = if query.compound.is_some() {
352 let mut left_query = query.clone();
353 left_query.select.limit = compound_limit;
354 self.execute_single_select(&left_query, params)?
355 } else {
356 return self.execute_single_select(query, params);
357 };
358
359 // compound is guaranteed Some here (non-compound returns above).
360 if let Some(ref compound) = query.compound {
361 let mut accumulated = left_results;
362 for (operator, right_select) in &compound.operations {
363 let mut right_query = crate::velesql::Query::new_select(right_select.clone());
364 right_query.select.limit = compound_limit;
365 let right_results = self.execute_single_select(&right_query, params)?;
366 accumulated = crate::collection::search::query::set_operations::apply_set_operation(
367 accumulated,
368 right_results,
369 *operator,
370 // Intermediate ops keep the server-side ceiling: truncating to the
371 // user LIMIT here would drop rows a later chained set op still needs.
372 COMPOUND_LIMIT,
373 );
374 }
375 // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
376 if let Some(limit) = query.select.limit {
377 accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
378 }
379 return Ok(accumulated);
380 }
381
382 Ok(left_results)
383 }
384
385 /// Collects sorted, deduplicated collection names referenced by a query,
386 /// including all compound operands (UNION, INTERSECT, EXCEPT).
387 ///
388 /// RF-DEDUP: Shared by `build_plan_key` and `populate_plan_cache`, which
389 /// both need the same sorted collection-name list from the query AST.
390 fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
391 let mut names = vec![query.select.from.clone()];
392 for join in &query.select.joins {
393 names.push(join.table.clone());
394 }
395 if let Some(ref compound) = query.compound {
396 for (_, right_select) in &compound.operations {
397 names.push(right_select.from.clone());
398 for join in &right_select.joins {
399 names.push(join.table.clone());
400 }
401 }
402 }
403 names.sort();
404 names.dedup();
405 names
406 }
407
408 /// Resolves a collection by name from all typed registries.
409 ///
410 /// Priority: vector collections first, then graph, then metadata.
411 /// Returns the inner `Collection` for query execution.
412 pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
413 if let Some(vc) = self.get_vector_collection(name) {
414 return Ok(vc.inner);
415 }
416 if let Some(gc) = self.get_graph_collection(name) {
417 return Ok(gc.inner);
418 }
419 if let Some(mc) = self.get_metadata_collection(name) {
420 return Ok(mc.inner);
421 }
422 Err(Error::CollectionNotFound(name.to_string()))
423 }
424
425 /// Resolves a collection that supports write operations (INSERT/UPDATE/TRAIN).
426 ///
427 /// Checks vector, graph, and metadata collections. Metadata-only collections
428 /// support INSERT/UPDATE for metadata fields (no vectors).
429 pub(super) fn resolve_writable_collection(
430 &self,
431 name: &str,
432 ) -> Result<crate::collection::Collection> {
433 if let Some(vc) = self.get_vector_collection(name) {
434 return Ok(vc.inner);
435 }
436 if let Some(gc) = self.get_graph_collection(name) {
437 return Ok(gc.inner);
438 }
439 if let Some(mc) = self.get_metadata_collection(name) {
440 return Ok(mc.inner);
441 }
442 Err(Error::CollectionNotFound(name.to_string()))
443 }
444
445 /// Executes a single SELECT (no compound), resolving JOINs if present.
446 ///
447 /// Orchestrates filter pushdown and join strategy selection:
448 /// 1. Analyze WHERE for pushdown-eligible conditions
449 /// 2. Strip pushed conditions from base query
450 /// 3. For each JOIN: lookup, filtered, or full `ColumnStore` path
451 /// 4. Apply post-join filters (cross-source predicates)
452 fn execute_single_select(
453 &self,
454 query: &crate::velesql::Query,
455 params: &std::collections::HashMap<String, serde_json::Value>,
456 ) -> Result<Vec<SearchResult>> {
457 let base_collection = self.resolve_collection(&query.select.from)?;
458
459 let mut single_query = query.clone();
460 single_query.compound = None;
461
462 if single_query.select.joins.is_empty() {
463 return base_collection.execute_query(&single_query, params);
464 }
465
466 let analysis = Self::prepare_join_pushdown(&mut single_query, params)?;
467 let pushed = analysis.column_store_filters.clone();
468
469 let row_budget = Self::join_row_budget(&query.select, &analysis);
470
471 let mut results = base_collection.execute_query(&single_query, params)?;
472 for join in &query.select.joins {
473 results = self.execute_single_join(&results, join, &pushed, row_budget)?;
474 }
475
476 // Apply post-join filters: cross-source predicates that reference
477 // columns from both the base collection and joined ColumnStore tables.
478 if !analysis.post_join_filters.is_empty() {
479 results = Self::apply_post_join_filters(
480 &base_collection,
481 results,
482 &analysis.post_join_filters,
483 params,
484 &query.select.from_alias,
485 )?;
486 }
487
488 Ok(results)
489 }
490
491 /// Resolves WHERE parameters, runs pushdown analysis, and strips pushed
492 /// conditions from the base query (JOIN path of `execute_single_select`).
493 ///
494 /// Parameter placeholders are resolved before the analysis so pushed-down
495 /// filters never silently convert them to NULL at the `ColumnStore` layer
496 /// (the collection pipeline resolves its own copy independently).
497 fn prepare_join_pushdown(
498 single_query: &mut crate::velesql::Query,
499 params: &std::collections::HashMap<String, serde_json::Value>,
500 ) -> Result<crate::collection::search::query::pushdown::PushdownAnalysis> {
501 if let Some(cond) = single_query.select.where_clause.take() {
502 single_query.select.where_clause = Some(
503 crate::collection::Collection::resolve_condition_params(&cond, params)?,
504 );
505 }
506
507 let analysis = Self::analyze_join_pushdown_for_select(&single_query.select);
508
509 let resolved_where = single_query.select.where_clause.clone();
510 single_query.select.joins.clear();
511 if !analysis.column_store_filters.is_empty() {
512 single_query.select.where_clause = Self::strip_pushed_conditions(
513 resolved_where.as_ref(),
514 &analysis.column_store_filters,
515 );
516 }
517 Ok(analysis)
518 }
519
520 /// Computes the bound on joined rows to materialize.
521 ///
522 /// When the query has an explicit LIMIT, no post-join filters, and no
523 /// ORDER BY (which could reorder past the window), the bound is the
524 /// effective `LIMIT + OFFSET`. GROUP BY / HAVING / DISTINCT also disqualify
525 /// the bounded shape: SQL LIMIT bounds output *groups/rows*, not input rows,
526 /// so truncating joined input to `LIMIT` would drop rows that belong to
527 /// groups still inside the window. Otherwise downstream stages may reorder or
528 /// drop rows, so we fall back to the conservative server-side ceiling
529 /// [`JOIN_ROW_CEILING`] — still bounding OOM without affecting correctness.
530 pub(super) fn join_row_budget(
531 select: &crate::velesql::SelectStatement,
532 analysis: &crate::collection::search::query::pushdown::PushdownAnalysis,
533 ) -> usize {
534 use crate::collection::search::query::JOIN_ROW_CEILING;
535 use crate::velesql::DistinctMode;
536 let bounded_shape = analysis.post_join_filters.is_empty()
537 && select.order_by.is_none()
538 && select.group_by.is_none()
539 && select.having.is_none()
540 && select.distinct == DistinctMode::None;
541 match select.limit {
542 Some(limit) if bounded_shape => {
543 let limit = usize::try_from(limit).unwrap_or(JOIN_ROW_CEILING);
544 let offset = select
545 .offset
546 .map_or(0, |o| usize::try_from(o).unwrap_or(JOIN_ROW_CEILING));
547 limit.saturating_add(offset).min(JOIN_ROW_CEILING)
548 }
549 _ => JOIN_ROW_CEILING,
550 }
551 }
552
553 // NOTE: analyze_join_pushdown_for_select, apply_post_join_filters
554 // moved to join_pushdown.rs (NLOC/file reduction)
555
556 /// Inserts a compiled plan into the cache after a cache miss (CACHE-02).
557 fn populate_plan_cache(&self, query: &crate::velesql::Query) {
558 let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
559 plan: self.build_plan_with_stats(query),
560 referenced_collections: Self::referenced_collection_names(query),
561 compiled_at: std::time::Instant::now(),
562 reuse_count: std::sync::atomic::AtomicU64::new(0),
563 });
564 // Rebuild key after execution to reflect current write_generation (C-1).
565 let post_exec_key = self.build_plan_key(query);
566 self.compiled_plan_cache.insert(post_exec_key, compiled);
567 }
568
569 /// Dispatches a DML statement (INSERT, UPSERT, UPDATE, DELETE, or edge mutations).
570 pub(super) fn execute_dml(
571 &self,
572 dml: &crate::velesql::DmlStatement,
573 params: &std::collections::HashMap<String, serde_json::Value>,
574 ) -> Result<Vec<SearchResult>> {
575 match dml {
576 crate::velesql::DmlStatement::Insert(stmt)
577 | crate::velesql::DmlStatement::Upsert(stmt) => self.execute_insert(stmt, params),
578 crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
579 crate::velesql::DmlStatement::InsertEdge(stmt) => self.execute_insert_edge(stmt),
580 crate::velesql::DmlStatement::Delete(stmt) => self.execute_delete(stmt),
581 crate::velesql::DmlStatement::DeleteEdge(stmt) => self.execute_delete_edge(stmt),
582 crate::velesql::DmlStatement::SelectEdges(stmt) => self.execute_select_edges(stmt),
583 crate::velesql::DmlStatement::InsertNode(stmt) => self.execute_insert_node(stmt),
584 }
585 }
586}