velesdb_core/database/query_engine.rs
1//! Query execution: `execute_query`, `explain_query`, `explain_analyze_query`, plan caching, and DML dispatch.
2
3use crate::velesql::{
4 ActualStats, AdminStatement, DdlStatement, DmlStatement, ExplainOutput, IntrospectionStatement,
5 Query, TrainStatement,
6};
7use crate::{Error, Result, SearchResult};
8
9use super::Database;
10
11/// Statement type classification for dispatch routing.
12enum StatementType<'a> {
13 Admin(&'a AdminStatement),
14 Introspection(&'a IntrospectionStatement),
15 Ddl(&'a DdlStatement),
16 Train(&'a TrainStatement),
17 Dml(&'a DmlStatement),
18 Match,
19 Select,
20}
21
22/// Classifies a query into its statement type for routing.
23fn classify_statement(query: &Query) -> StatementType<'_> {
24 if let Some(admin) = query.admin.as_ref() {
25 return StatementType::Admin(admin);
26 }
27 if let Some(intro) = query.introspection.as_ref() {
28 return StatementType::Introspection(intro);
29 }
30 if let Some(ddl) = query.ddl.as_ref() {
31 return StatementType::Ddl(ddl);
32 }
33 if let Some(train) = query.train.as_ref() {
34 return StatementType::Train(train);
35 }
36 if let Some(dml) = query.dml.as_ref() {
37 return StatementType::Dml(dml);
38 }
39 if query.is_match_query() {
40 return StatementType::Match;
41 }
42 StatementType::Select
43}
44
45impl Database {
46 /// Produces a canonical JSON string for a `serde_json::Value`.
47 ///
48 /// Recursively sorts the keys of every JSON object so that two values
49 /// representing the same logical structure always produce identical bytes,
50 /// regardless of the `HashMap` iteration order used during serialization.
51 ///
52 /// This is required because `FusionConfig::params` and
53 /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
54 /// them in hash-order, which is non-deterministic across invocations.
55 fn canonical_json(value: serde_json::Value) -> serde_json::Value {
56 match value {
57 serde_json::Value::Object(map) => {
58 // Without the `preserve_order` feature flag, `serde_json::Map` is already
59 // backed by `BTreeMap` and therefore already sorted. This explicit sort
60 // step is kept as defense-in-depth: if `preserve_order` is ever enabled
61 // in `Cargo.toml` (which switches the backing store to `IndexMap` and
62 // preserves insertion order), the canonical key ordering is still upheld
63 // without any change to this function.
64 let sorted: serde_json::Map<String, serde_json::Value> = map
65 .into_iter()
66 .map(|(k, v)| (k, Self::canonical_json(v)))
67 .collect::<std::collections::BTreeMap<_, _>>()
68 .into_iter()
69 .collect();
70 serde_json::Value::Object(sorted)
71 }
72 serde_json::Value::Array(arr) => {
73 serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
74 }
75 other => other,
76 }
77 }
78
79 /// Builds a deterministic cache key for a query (CACHE-02).
80 ///
81 /// Serialises the query to canonical JSON (object keys sorted recursively),
82 /// reads the current `schema_version`, and gathers per-collection
83 /// `write_generation` counters (sorted by collection name) to form a
84 /// `PlanKey`.
85 ///
86 /// # Why canonical JSON instead of `Debug`
87 ///
88 /// `format!("{query:?}")` is non-deterministic when the `Query` AST
89 /// contains `HashMap`-backed fields (`FusionConfig::params`,
90 /// `TrainStatement::params`) because `HashMap` iteration order is not
91 /// guaranteed across invocations. Canonical JSON with sorted object keys
92 /// is stable and produces the same byte sequence for logically identical
93 /// queries.
94 #[must_use]
95 pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
96 use std::hash::{BuildHasher, Hasher};
97
98 // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
99 // Fallback to Debug representation if serialization fails (should never happen in
100 // practice since all Query fields are Serialize, but erring on the side of liveness).
101 let query_text = serde_json::to_value(query)
102 .map(Self::canonical_json)
103 .and_then(|v| serde_json::to_string(&v))
104 .unwrap_or_else(|_| format!("{query:?}"));
105
106 let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
107 hasher.write(query_text.as_bytes());
108 let query_hash = hasher.finish();
109
110 let schema_version = self.schema_version();
111 let collection_names = Self::referenced_collection_names(query);
112
113 // Build generations vector in sorted collection order.
114 let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
115 .iter()
116 .map(|name| self.collection_write_generation(name).unwrap_or(0))
117 .collect();
118
119 // Issue #608: parallel vector of analyze generations so that running
120 // ANALYZE alone (no data mutation) still flips the cache key and
121 // rebuilds plans with the fresh calibrated cost estimates.
122 let analyze_generations: smallvec::SmallVec<[u64; 4]> = collection_names
123 .iter()
124 .map(|name| self.collection_analyze_generation(name).unwrap_or(0))
125 .collect();
126
127 crate::cache::PlanKey {
128 query_hash,
129 schema_version,
130 collection_generations,
131 analyze_generations,
132 }
133 }
134
135 /// Returns the query plan for a query, with cache status populated (CACHE-02).
136 ///
137 /// If the plan is cached, returns it with `cache_hit: Some(true)` and
138 /// `plan_reuse_count` set. Otherwise generates a fresh plan with
139 /// `cache_hit: Some(false)`.
140 ///
141 /// # Design decision: `explain_query` does not populate the cache
142 ///
143 /// `explain_query` intentionally does **not** insert a new plan into the
144 /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
145 /// influence cache state would make cache metrics (hit/miss ratios,
146 /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
147 /// indistinguishable from real execution hits. Only `execute_query` is
148 /// authorised to write to the cache.
149 ///
150 /// # Errors
151 ///
152 /// Returns an error if the query is invalid.
153 pub fn explain_query(
154 &self,
155 query: &crate::velesql::Query,
156 ) -> Result<crate::velesql::QueryPlan> {
157 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
158
159 let plan_key = self.build_plan_key(query);
160
161 if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
162 let mut plan = cached.plan.clone();
163 plan.cache_hit = Some(true);
164 plan.plan_reuse_count = Some(
165 cached
166 .reuse_count
167 .load(std::sync::atomic::Ordering::Relaxed),
168 );
169 return Ok(plan);
170 }
171
172 let mut plan = self.build_plan_with_stats(query);
173 plan.cache_hit = Some(false);
174 plan.plan_reuse_count = Some(0);
175 Ok(plan)
176 }
177
178 /// Builds a query plan, resolving calibrated collection statistics AND
179 /// the registered secondary index set from the registry when available
180 /// (#471 — EXPLAIN real costs, #607 — `IndexLookup` wiring).
181 ///
182 /// The returned plan's `estimated_cost_ms` and `filter_strategy` are
183 /// calibrated via `CostEstimator` when stats exist for the query's
184 /// primary collection. Falls back to heuristics otherwise. The
185 /// `indexed_fields` argument is populated from
186 /// `Database::indexed_fields_for` so that `IndexLookup` nodes appear
187 /// in the EXPLAIN tree for WHERE clauses targeting indexed columns.
188 fn build_plan_with_stats(&self, query: &crate::velesql::Query) -> crate::velesql::QueryPlan {
189 let primary = &query.select.from;
190 let core_stats = self.get_collection_stats(primary).ok().flatten();
191 let indexed = self.indexed_fields_for(primary);
192 crate::velesql::QueryPlan::from_query_with_stats(query, &indexed, core_stats.as_ref())
193 }
194
195 /// Executes a query with instrumentation and returns both plan and actual stats.
196 ///
197 /// Unlike `explain_query` (plan only) and `execute_query` (results only),
198 /// this method returns the full [`ExplainOutput`] with measured statistics.
199 /// The normal `execute_query` path is untouched — zero overhead on
200 /// non-ANALYZE queries.
201 ///
202 /// # Errors
203 ///
204 /// Returns an error if the query is invalid or execution fails.
205 pub fn explain_analyze_query(
206 &self,
207 query: &Query,
208 params: &std::collections::HashMap<String, serde_json::Value>,
209 ) -> Result<ExplainOutput> {
210 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
211
212 let plan = self.explain_query(query)?;
213 let start = std::time::Instant::now();
214 let results = self.execute_query(query, params)?;
215 let elapsed = start.elapsed();
216
217 let actual_rows = results.len() as u64;
218 let actual_time_ms = elapsed.as_secs_f64() * 1000.0;
219 let is_match = query.is_match_query();
220 let (nodes_visited, edges_traversed) = if is_match {
221 (actual_rows, actual_rows)
222 } else {
223 (0, 0)
224 };
225
226 let stats = ActualStats {
227 actual_rows,
228 actual_time_ms,
229 loops: 1,
230 nodes_visited,
231 edges_traversed,
232 };
233
234 let node_stats =
235 crate::velesql::build_leaf_node_stats(&plan.root, actual_rows, actual_time_ms);
236 Ok(ExplainOutput::with_stats(plan, stats, node_stats))
237 }
238
239 /// Executes a `VelesQL` query with database-level JOIN resolution.
240 ///
241 /// This method resolves JOIN target collections from the database registry
242 /// and executes JOIN runtime in sequence. Query plans are cached and
243 /// reused for identical queries against unchanged collections (CACHE-02).
244 ///
245 /// # Errors
246 ///
247 /// Returns an error if the base collection or any JOIN collection is missing.
248 pub fn execute_query(
249 &self,
250 query: &crate::velesql::Query,
251 params: &std::collections::HashMap<String, serde_json::Value>,
252 ) -> Result<Vec<SearchResult>> {
253 crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
254
255 if let Some(results) = self.dispatch_non_select(query, params)? {
256 return Ok(results);
257 }
258
259 // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
260 //
261 // `contains()` is used instead of `get().is_some()` so that this
262 // existence check does not increment the hit/miss counters or
263 // `reuse_count`. Only `explain_query` (which surfaces these values to
264 // callers) should call `get()`.
265 let pre_exec_key = self.build_plan_key(query);
266 let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
267
268 let results = self.execute_select_query(query, params)?;
269
270 // Populate cache on miss (CACHE-02).
271 //
272 // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
273 // pre-execution `contains()` check and here, a concurrent writer may
274 // have bumped a collection's `write_generation` (e.g. via `upsert` on
275 // another thread). Rebuilding the key captures the post-execution
276 // state, so the cached plan is associated with the generation that was
277 // live when the plan was actually compiled — not a potentially stale
278 // pre-execution snapshot.
279 if !is_cached {
280 self.populate_plan_cache(query);
281 }
282
283 Ok(results)
284 }
285
286 /// Classifies and dispatches non-SELECT statement types.
287 ///
288 /// Returns `Ok(Some(results))` if handled, `Ok(None)` for SELECT queries.
289 fn dispatch_non_select(
290 &self,
291 query: &crate::velesql::Query,
292 params: &std::collections::HashMap<String, serde_json::Value>,
293 ) -> Result<Option<Vec<SearchResult>>> {
294 // Classify the statement type (at most one is Some).
295 let stmt_type = classify_statement(query);
296 match stmt_type {
297 StatementType::Admin(admin) => Ok(Some(self.execute_admin(admin)?)),
298 StatementType::Introspection(intro) => Ok(Some(self.execute_introspection(intro)?)),
299 StatementType::Ddl(ddl) => Ok(Some(self.execute_ddl(ddl)?)),
300 StatementType::Train(train) => Ok(Some(self.execute_train(train)?)),
301 StatementType::Dml(dml) => Ok(Some(self.execute_dml(dml, params)?)),
302 StatementType::Match => {
303 // Route MATCH queries to the target collection.
304 // Resolution order:
305 // 1. select.from (e.g. "SELECT * FROM kg WHERE MATCH ...")
306 // 2. "_collection" key in params (programmatic API)
307 // 3. Error with guidance
308 let collection_name = if !query.select.from.is_empty() {
309 query.select.from.clone()
310 } else if let Some(serde_json::Value::String(name)) = params.get("_collection") {
311 name.clone()
312 } else {
313 return Err(Error::Query(
314 "MATCH query requires a target collection. Either use \
315 SELECT ... FROM <collection> WHERE MATCH ..., or pass \
316 {\"_collection\": \"name\"} in params."
317 .to_string(),
318 ));
319 };
320 let coll = self.resolve_collection(&collection_name)?;
321 let mut results = coll.execute_query(query, params)?;
322
323 // Cross-collection enrichment: if any node pattern has a
324 // @collection annotation, look up payloads from those
325 // collections and merge into the projected fields.
326 if let Some(mc) = &query.match_clause {
327 self.enrich_match_results_cross_collection(mc, &mut results);
328 }
329
330 Ok(Some(results))
331 }
332 StatementType::Select => Ok(None),
333 }
334 }
335
336 /// Executes the SELECT portion of a query, resolving JOINs if present.
337 fn execute_select_query(
338 &self,
339 query: &crate::velesql::Query,
340 params: &std::collections::HashMap<String, serde_json::Value>,
341 ) -> Result<Vec<SearchResult>> {
342 // EPIC-040 US-006: For compound queries, strip LIMIT from each operand so
343 // the set operation sees the full result sets. The final LIMIT is applied
344 // once on the merged output (SQL-standard behaviour).
345 // Use MAX_LIMIT (not None) to avoid the default-10 cap downstream.
346 let compound_limit = Some(100_000_u64);
347 let left_results = if query.compound.is_some() {
348 let mut left_query = query.clone();
349 left_query.select.limit = compound_limit;
350 self.execute_single_select(&left_query, params)?
351 } else {
352 return self.execute_single_select(query, params);
353 };
354
355 // compound is guaranteed Some here (non-compound returns above).
356 if let Some(ref compound) = query.compound {
357 let mut accumulated = left_results;
358 for (operator, right_select) in &compound.operations {
359 let mut right_query = crate::velesql::Query::new_select(right_select.clone());
360 right_query.select.limit = compound_limit;
361 let right_results = self.execute_single_select(&right_query, params)?;
362 accumulated = crate::collection::search::query::set_operations::apply_set_operation(
363 accumulated,
364 right_results,
365 *operator,
366 );
367 }
368 // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
369 if let Some(limit) = query.select.limit {
370 accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
371 }
372 return Ok(accumulated);
373 }
374
375 Ok(left_results)
376 }
377
378 /// Collects sorted, deduplicated collection names referenced by a query,
379 /// including all compound operands (UNION, INTERSECT, EXCEPT).
380 ///
381 /// RF-DEDUP: Shared by `build_plan_key` and `populate_plan_cache`, which
382 /// both need the same sorted collection-name list from the query AST.
383 fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
384 let mut names = vec![query.select.from.clone()];
385 for join in &query.select.joins {
386 names.push(join.table.clone());
387 }
388 if let Some(ref compound) = query.compound {
389 for (_, right_select) in &compound.operations {
390 names.push(right_select.from.clone());
391 for join in &right_select.joins {
392 names.push(join.table.clone());
393 }
394 }
395 }
396 names.sort();
397 names.dedup();
398 names
399 }
400
401 /// Resolves a collection by name from all typed registries.
402 ///
403 /// Priority: vector collections first, then graph, then metadata.
404 /// Returns the inner `Collection` for query execution.
405 pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
406 if let Some(vc) = self.get_vector_collection(name) {
407 return Ok(vc.inner);
408 }
409 if let Some(gc) = self.get_graph_collection(name) {
410 return Ok(gc.inner);
411 }
412 if let Some(mc) = self.get_metadata_collection(name) {
413 return Ok(mc.inner);
414 }
415 Err(Error::CollectionNotFound(name.to_string()))
416 }
417
418 /// Resolves a collection that supports write operations (INSERT/UPDATE/TRAIN).
419 ///
420 /// Checks vector, graph, and metadata collections. Metadata-only collections
421 /// support INSERT/UPDATE for metadata fields (no vectors).
422 pub(super) fn resolve_writable_collection(
423 &self,
424 name: &str,
425 ) -> Result<crate::collection::Collection> {
426 if let Some(vc) = self.get_vector_collection(name) {
427 return Ok(vc.inner);
428 }
429 if let Some(gc) = self.get_graph_collection(name) {
430 return Ok(gc.inner);
431 }
432 if let Some(mc) = self.get_metadata_collection(name) {
433 return Ok(mc.inner);
434 }
435 Err(Error::CollectionNotFound(name.to_string()))
436 }
437
438 /// Executes a single SELECT (no compound), resolving JOINs if present.
439 ///
440 /// Orchestrates filter pushdown and join strategy selection:
441 /// 1. Analyze WHERE for pushdown-eligible conditions
442 /// 2. Strip pushed conditions from base query
443 /// 3. For each JOIN: lookup, filtered, or full `ColumnStore` path
444 /// 4. Apply post-join filters (cross-source predicates)
445 fn execute_single_select(
446 &self,
447 query: &crate::velesql::Query,
448 params: &std::collections::HashMap<String, serde_json::Value>,
449 ) -> Result<Vec<SearchResult>> {
450 let base_collection = self.resolve_collection(&query.select.from)?;
451
452 let mut single_query = query.clone();
453 single_query.compound = None;
454
455 if single_query.select.joins.is_empty() {
456 return base_collection.execute_query(&single_query, params);
457 }
458
459 let analysis = Self::analyze_join_pushdown_for_select(&query.select);
460
461 let pushed = analysis.column_store_filters.clone();
462
463 single_query.select.joins.clear();
464 if !pushed.is_empty() {
465 single_query.select.where_clause =
466 Self::strip_pushed_conditions(query.select.where_clause.as_ref(), &pushed);
467 }
468
469 let mut results = base_collection.execute_query(&single_query, params)?;
470 for join in &query.select.joins {
471 results = self.execute_single_join(&results, join, &pushed)?;
472 }
473
474 // Apply post-join filters: cross-source predicates that reference
475 // columns from both the base collection and joined ColumnStore tables.
476 if !analysis.post_join_filters.is_empty() {
477 results = Self::apply_post_join_filters(
478 &base_collection,
479 results,
480 &analysis.post_join_filters,
481 params,
482 &query.select.from_alias,
483 )?;
484 }
485
486 Ok(results)
487 }
488
489 // NOTE: analyze_join_pushdown_for_select, apply_post_join_filters
490 // moved to join_pushdown.rs (NLOC/file reduction)
491
492 /// Inserts a compiled plan into the cache after a cache miss (CACHE-02).
493 fn populate_plan_cache(&self, query: &crate::velesql::Query) {
494 let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
495 plan: self.build_plan_with_stats(query),
496 referenced_collections: Self::referenced_collection_names(query),
497 compiled_at: std::time::Instant::now(),
498 reuse_count: std::sync::atomic::AtomicU64::new(0),
499 });
500 // Rebuild key after execution to reflect current write_generation (C-1).
501 let post_exec_key = self.build_plan_key(query);
502 self.compiled_plan_cache.insert(post_exec_key, compiled);
503 }
504
505 /// Dispatches a DML statement (INSERT, UPSERT, UPDATE, DELETE, or edge mutations).
506 pub(super) fn execute_dml(
507 &self,
508 dml: &crate::velesql::DmlStatement,
509 params: &std::collections::HashMap<String, serde_json::Value>,
510 ) -> Result<Vec<SearchResult>> {
511 match dml {
512 crate::velesql::DmlStatement::Insert(stmt)
513 | crate::velesql::DmlStatement::Upsert(stmt) => self.execute_insert(stmt, params),
514 crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
515 crate::velesql::DmlStatement::InsertEdge(stmt) => self.execute_insert_edge(stmt),
516 crate::velesql::DmlStatement::Delete(stmt) => self.execute_delete(stmt),
517 crate::velesql::DmlStatement::DeleteEdge(stmt) => self.execute_delete_edge(stmt),
518 crate::velesql::DmlStatement::SelectEdges(stmt) => self.execute_select_edges(stmt),
519 crate::velesql::DmlStatement::InsertNode(stmt) => self.execute_insert_node(stmt),
520 }
521 }
522}