Skip to main content

velesdb_core/database/
query_engine.rs

1//! Query execution: `execute_query`, `explain_query`, `explain_analyze_query`, plan caching, and DML dispatch.
2
3use crate::velesql::{
4    ActualStats, AdminStatement, DdlStatement, DmlStatement, ExplainOutput, IntrospectionStatement,
5    Query, TrainStatement,
6};
7use crate::{Error, Result, SearchResult};
8
9use super::Database;
10
11/// Statement type classification for dispatch routing.
12enum StatementType<'a> {
13    Admin(&'a AdminStatement),
14    Introspection(&'a IntrospectionStatement),
15    Ddl(&'a DdlStatement),
16    Train(&'a TrainStatement),
17    Dml(&'a DmlStatement),
18    Match,
19    Select,
20}
21
22/// Classifies a query into its statement type for routing.
23fn classify_statement(query: &Query) -> StatementType<'_> {
24    if let Some(admin) = query.admin.as_ref() {
25        return StatementType::Admin(admin);
26    }
27    if let Some(intro) = query.introspection.as_ref() {
28        return StatementType::Introspection(intro);
29    }
30    if let Some(ddl) = query.ddl.as_ref() {
31        return StatementType::Ddl(ddl);
32    }
33    if let Some(train) = query.train.as_ref() {
34        return StatementType::Train(train);
35    }
36    if let Some(dml) = query.dml.as_ref() {
37        return StatementType::Dml(dml);
38    }
39    if query.is_match_query() {
40        return StatementType::Match;
41    }
42    StatementType::Select
43}
44
45impl Database {
46    /// Produces a canonical JSON string for a `serde_json::Value`.
47    ///
48    /// Recursively sorts the keys of every JSON object so that two values
49    /// representing the same logical structure always produce identical bytes,
50    /// regardless of the `HashMap` iteration order used during serialization.
51    ///
52    /// This is required because `FusionConfig::params` and
53    /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
54    /// them in hash-order, which is non-deterministic across invocations.
55    fn canonical_json(value: serde_json::Value) -> serde_json::Value {
56        match value {
57            serde_json::Value::Object(map) => {
58                // Without the `preserve_order` feature flag, `serde_json::Map` is already
59                // backed by `BTreeMap` and therefore already sorted. This explicit sort
60                // step is kept as defense-in-depth: if `preserve_order` is ever enabled
61                // in `Cargo.toml` (which switches the backing store to `IndexMap` and
62                // preserves insertion order), the canonical key ordering is still upheld
63                // without any change to this function.
64                let sorted: serde_json::Map<String, serde_json::Value> = map
65                    .into_iter()
66                    .map(|(k, v)| (k, Self::canonical_json(v)))
67                    .collect::<std::collections::BTreeMap<_, _>>()
68                    .into_iter()
69                    .collect();
70                serde_json::Value::Object(sorted)
71            }
72            serde_json::Value::Array(arr) => {
73                serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
74            }
75            other => other,
76        }
77    }
78
79    /// Builds a deterministic cache key for a query (CACHE-02).
80    ///
81    /// Serialises the query to canonical JSON (object keys sorted recursively),
82    /// reads the current `schema_version`, and gathers per-collection
83    /// `write_generation` counters (sorted by collection name) to form a
84    /// `PlanKey`.
85    ///
86    /// # Why canonical JSON instead of `Debug`
87    ///
88    /// `format!("{query:?}")` is non-deterministic when the `Query` AST
89    /// contains `HashMap`-backed fields (`FusionConfig::params`,
90    /// `TrainStatement::params`) because `HashMap` iteration order is not
91    /// guaranteed across invocations. Canonical JSON with sorted object keys
92    /// is stable and produces the same byte sequence for logically identical
93    /// queries.
94    #[must_use]
95    pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
96        use std::hash::{BuildHasher, Hasher};
97
98        // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
99        // Fallback to Debug representation if serialization fails (should never happen in
100        // practice since all Query fields are Serialize, but erring on the side of liveness).
101        let query_text = serde_json::to_value(query)
102            .map(Self::canonical_json)
103            .and_then(|v| serde_json::to_string(&v))
104            .unwrap_or_else(|_| format!("{query:?}"));
105
106        let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
107        hasher.write(query_text.as_bytes());
108        let query_hash = hasher.finish();
109
110        let schema_version = self.schema_version();
111        let collection_names = Self::referenced_collection_names(query);
112
113        // Build generations vector in sorted collection order.
114        let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
115            .iter()
116            .map(|name| self.collection_write_generation(name).unwrap_or(0))
117            .collect();
118
119        // Issue #608: parallel vector of analyze generations so that running
120        // ANALYZE alone (no data mutation) still flips the cache key and
121        // rebuilds plans with the fresh calibrated cost estimates.
122        let analyze_generations: smallvec::SmallVec<[u64; 4]> = collection_names
123            .iter()
124            .map(|name| self.collection_analyze_generation(name).unwrap_or(0))
125            .collect();
126
127        crate::cache::PlanKey {
128            query_hash,
129            schema_version,
130            collection_generations,
131            analyze_generations,
132        }
133    }
134
135    /// Returns the query plan for a query, with cache status populated (CACHE-02).
136    ///
137    /// If the plan is cached, returns it with `cache_hit: Some(true)` and
138    /// `plan_reuse_count` set. Otherwise generates a fresh plan with
139    /// `cache_hit: Some(false)`.
140    ///
141    /// # Design decision: `explain_query` does not populate the cache
142    ///
143    /// `explain_query` intentionally does **not** insert a new plan into the
144    /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
145    /// influence cache state would make cache metrics (hit/miss ratios,
146    /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
147    /// indistinguishable from real execution hits. Only `execute_query` is
148    /// authorised to write to the cache.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the query is invalid.
153    pub fn explain_query(
154        &self,
155        query: &crate::velesql::Query,
156    ) -> Result<crate::velesql::QueryPlan> {
157        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
158
159        let plan_key = self.build_plan_key(query);
160
161        if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
162            let mut plan = cached.plan.clone();
163            plan.cache_hit = Some(true);
164            plan.plan_reuse_count = Some(
165                cached
166                    .reuse_count
167                    .load(std::sync::atomic::Ordering::Relaxed),
168            );
169            return Ok(plan);
170        }
171
172        let mut plan = self.build_plan_with_stats(query);
173        plan.cache_hit = Some(false);
174        plan.plan_reuse_count = Some(0);
175        Ok(plan)
176    }
177
178    /// Builds a query plan, resolving calibrated collection statistics AND
179    /// the registered secondary index set from the registry when available
180    /// (#471 — EXPLAIN real costs, #607 — `IndexLookup` wiring).
181    ///
182    /// The returned plan's `estimated_cost_ms` and `filter_strategy` are
183    /// calibrated via `CostEstimator` when stats exist for the query's
184    /// primary collection. Falls back to heuristics otherwise. The
185    /// `indexed_fields` argument is populated from
186    /// `Database::indexed_fields_for` so that `IndexLookup` nodes appear
187    /// in the EXPLAIN tree for WHERE clauses targeting indexed columns.
188    fn build_plan_with_stats(&self, query: &crate::velesql::Query) -> crate::velesql::QueryPlan {
189        let primary = &query.select.from;
190        let core_stats = self.get_collection_stats(primary).ok().flatten();
191        let indexed = self.indexed_fields_for(primary);
192        crate::velesql::QueryPlan::from_query_with_stats(query, &indexed, core_stats.as_ref())
193    }
194
195    /// Executes a query with instrumentation and returns both plan and actual stats.
196    ///
197    /// Unlike `explain_query` (plan only) and `execute_query` (results only),
198    /// this method returns the full [`ExplainOutput`] with measured statistics.
199    /// The normal `execute_query` path is untouched — zero overhead on
200    /// non-ANALYZE queries.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the query is invalid or execution fails.
205    pub fn explain_analyze_query(
206        &self,
207        query: &Query,
208        params: &std::collections::HashMap<String, serde_json::Value>,
209    ) -> Result<ExplainOutput> {
210        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
211
212        let plan = self.explain_query(query)?;
213        let start = std::time::Instant::now();
214        let results = self.execute_query(query, params)?;
215        let elapsed = start.elapsed();
216
217        let actual_rows = results.len() as u64;
218        let actual_time_ms = elapsed.as_secs_f64() * 1000.0;
219        let is_match = query.is_match_query();
220        let (nodes_visited, edges_traversed) = if is_match {
221            (actual_rows, actual_rows)
222        } else {
223            (0, 0)
224        };
225
226        let stats = ActualStats {
227            actual_rows,
228            actual_time_ms,
229            loops: 1,
230            nodes_visited,
231            edges_traversed,
232        };
233
234        let node_stats =
235            crate::velesql::build_leaf_node_stats(&plan.root, actual_rows, actual_time_ms);
236        Ok(ExplainOutput::with_stats(plan, stats, node_stats))
237    }
238
239    /// Executes a `VelesQL` query with database-level JOIN resolution.
240    ///
241    /// This method resolves JOIN target collections from the database registry
242    /// and executes JOIN runtime in sequence. Query plans are cached and
243    /// reused for identical queries against unchanged collections (CACHE-02).
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the base collection or any JOIN collection is missing.
248    pub fn execute_query(
249        &self,
250        query: &crate::velesql::Query,
251        params: &std::collections::HashMap<String, serde_json::Value>,
252    ) -> Result<Vec<SearchResult>> {
253        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
254
255        if let Some(results) = self.dispatch_non_select(query, params)? {
256            return Ok(results);
257        }
258
259        // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
260        //
261        // `contains()` is used instead of `get().is_some()` so that this
262        // existence check does not increment the hit/miss counters or
263        // `reuse_count`. Only `explain_query` (which surfaces these values to
264        // callers) should call `get()`.
265        let pre_exec_key = self.build_plan_key(query);
266        let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
267
268        let results = self.execute_select_query(query, params)?;
269
270        // Populate cache on miss (CACHE-02).
271        //
272        // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
273        // pre-execution `contains()` check and here, a concurrent writer may
274        // have bumped a collection's `write_generation` (e.g. via `upsert` on
275        // another thread). Rebuilding the key captures the post-execution
276        // state, so the cached plan is associated with the generation that was
277        // live when the plan was actually compiled — not a potentially stale
278        // pre-execution snapshot.
279        if !is_cached {
280            self.populate_plan_cache(query);
281        }
282
283        Ok(results)
284    }
285
286    /// Classifies and dispatches non-SELECT statement types.
287    ///
288    /// Returns `Ok(Some(results))` if handled, `Ok(None)` for SELECT queries.
289    fn dispatch_non_select(
290        &self,
291        query: &crate::velesql::Query,
292        params: &std::collections::HashMap<String, serde_json::Value>,
293    ) -> Result<Option<Vec<SearchResult>>> {
294        // Classify the statement type (at most one is Some).
295        let stmt_type = classify_statement(query);
296        match stmt_type {
297            StatementType::Admin(admin) => Ok(Some(self.execute_admin(admin)?)),
298            StatementType::Introspection(intro) => Ok(Some(self.execute_introspection(intro)?)),
299            StatementType::Ddl(ddl) => Ok(Some(self.execute_ddl(ddl)?)),
300            StatementType::Train(train) => Ok(Some(self.execute_train(train)?)),
301            StatementType::Dml(dml) => Ok(Some(self.execute_dml(dml, params)?)),
302            StatementType::Match => {
303                // Route MATCH queries to the target collection.
304                // Resolution order:
305                // 1. select.from (e.g. "SELECT * FROM kg WHERE MATCH ...")
306                // 2. "_collection" key in params (programmatic API)
307                // 3. Error with guidance
308                let collection_name = if !query.select.from.is_empty() {
309                    query.select.from.clone()
310                } else if let Some(serde_json::Value::String(name)) = params.get("_collection") {
311                    name.clone()
312                } else {
313                    return Err(Error::Query(
314                        "MATCH query requires a target collection. Either use \
315                         SELECT ... FROM <collection> WHERE MATCH ..., or pass \
316                         {\"_collection\": \"name\"} in params."
317                            .to_string(),
318                    ));
319                };
320                let coll = self.resolve_collection(&collection_name)?;
321                let mut results = coll.execute_query(query, params)?;
322
323                // Cross-collection enrichment: if any node pattern has a
324                // @collection annotation, look up payloads from those
325                // collections and merge into the projected fields.
326                if let Some(mc) = &query.match_clause {
327                    self.enrich_match_results_cross_collection(mc, &mut results);
328                }
329
330                Ok(Some(results))
331            }
332            StatementType::Select => Ok(None),
333        }
334    }
335
336    /// Executes the SELECT portion of a query, resolving JOINs if present.
337    fn execute_select_query(
338        &self,
339        query: &crate::velesql::Query,
340        params: &std::collections::HashMap<String, serde_json::Value>,
341    ) -> Result<Vec<SearchResult>> {
342        // EPIC-040 US-006: For compound queries, strip LIMIT from each operand so
343        // the set operation sees the full result sets.  The final LIMIT is applied
344        // once on the merged output (SQL-standard behaviour).
345        // Use MAX_LIMIT (not None) to avoid the default-10 cap downstream.
346        let compound_limit = Some(100_000_u64);
347        let left_results = if query.compound.is_some() {
348            let mut left_query = query.clone();
349            left_query.select.limit = compound_limit;
350            self.execute_single_select(&left_query, params)?
351        } else {
352            return self.execute_single_select(query, params);
353        };
354
355        // compound is guaranteed Some here (non-compound returns above).
356        if let Some(ref compound) = query.compound {
357            let mut accumulated = left_results;
358            for (operator, right_select) in &compound.operations {
359                let mut right_query = crate::velesql::Query::new_select(right_select.clone());
360                right_query.select.limit = compound_limit;
361                let right_results = self.execute_single_select(&right_query, params)?;
362                accumulated = crate::collection::search::query::set_operations::apply_set_operation(
363                    accumulated,
364                    right_results,
365                    *operator,
366                );
367            }
368            // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
369            if let Some(limit) = query.select.limit {
370                accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
371            }
372            return Ok(accumulated);
373        }
374
375        Ok(left_results)
376    }
377
378    /// Collects sorted, deduplicated collection names referenced by a query,
379    /// including all compound operands (UNION, INTERSECT, EXCEPT).
380    ///
381    /// RF-DEDUP: Shared by `build_plan_key` and `populate_plan_cache`, which
382    /// both need the same sorted collection-name list from the query AST.
383    fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
384        let mut names = vec![query.select.from.clone()];
385        for join in &query.select.joins {
386            names.push(join.table.clone());
387        }
388        if let Some(ref compound) = query.compound {
389            for (_, right_select) in &compound.operations {
390                names.push(right_select.from.clone());
391                for join in &right_select.joins {
392                    names.push(join.table.clone());
393                }
394            }
395        }
396        names.sort();
397        names.dedup();
398        names
399    }
400
401    /// Resolves a collection by name from all typed registries.
402    ///
403    /// Priority: vector collections first, then graph, then metadata.
404    /// Returns the inner `Collection` for query execution.
405    pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
406        if let Some(vc) = self.get_vector_collection(name) {
407            return Ok(vc.inner);
408        }
409        if let Some(gc) = self.get_graph_collection(name) {
410            return Ok(gc.inner);
411        }
412        if let Some(mc) = self.get_metadata_collection(name) {
413            return Ok(mc.inner);
414        }
415        Err(Error::CollectionNotFound(name.to_string()))
416    }
417
418    /// Resolves a collection that supports write operations (INSERT/UPDATE/TRAIN).
419    ///
420    /// Checks vector, graph, and metadata collections. Metadata-only collections
421    /// support INSERT/UPDATE for metadata fields (no vectors).
422    pub(super) fn resolve_writable_collection(
423        &self,
424        name: &str,
425    ) -> Result<crate::collection::Collection> {
426        if let Some(vc) = self.get_vector_collection(name) {
427            return Ok(vc.inner);
428        }
429        if let Some(gc) = self.get_graph_collection(name) {
430            return Ok(gc.inner);
431        }
432        if let Some(mc) = self.get_metadata_collection(name) {
433            return Ok(mc.inner);
434        }
435        Err(Error::CollectionNotFound(name.to_string()))
436    }
437
438    /// Executes a single SELECT (no compound), resolving JOINs if present.
439    ///
440    /// Orchestrates filter pushdown and join strategy selection:
441    /// 1. Analyze WHERE for pushdown-eligible conditions
442    /// 2. Strip pushed conditions from base query
443    /// 3. For each JOIN: lookup, filtered, or full `ColumnStore` path
444    /// 4. Apply post-join filters (cross-source predicates)
445    fn execute_single_select(
446        &self,
447        query: &crate::velesql::Query,
448        params: &std::collections::HashMap<String, serde_json::Value>,
449    ) -> Result<Vec<SearchResult>> {
450        let base_collection = self.resolve_collection(&query.select.from)?;
451
452        let mut single_query = query.clone();
453        single_query.compound = None;
454
455        if single_query.select.joins.is_empty() {
456            return base_collection.execute_query(&single_query, params);
457        }
458
459        let analysis = Self::analyze_join_pushdown_for_select(&query.select);
460
461        let pushed = analysis.column_store_filters.clone();
462
463        single_query.select.joins.clear();
464        if !pushed.is_empty() {
465            single_query.select.where_clause =
466                Self::strip_pushed_conditions(query.select.where_clause.as_ref(), &pushed);
467        }
468
469        let mut results = base_collection.execute_query(&single_query, params)?;
470        for join in &query.select.joins {
471            results = self.execute_single_join(&results, join, &pushed)?;
472        }
473
474        // Apply post-join filters: cross-source predicates that reference
475        // columns from both the base collection and joined ColumnStore tables.
476        if !analysis.post_join_filters.is_empty() {
477            results = Self::apply_post_join_filters(
478                &base_collection,
479                results,
480                &analysis.post_join_filters,
481                params,
482                &query.select.from_alias,
483            )?;
484        }
485
486        Ok(results)
487    }
488
489    // NOTE: analyze_join_pushdown_for_select, apply_post_join_filters
490    // moved to join_pushdown.rs (NLOC/file reduction)
491
492    /// Inserts a compiled plan into the cache after a cache miss (CACHE-02).
493    fn populate_plan_cache(&self, query: &crate::velesql::Query) {
494        let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
495            plan: self.build_plan_with_stats(query),
496            referenced_collections: Self::referenced_collection_names(query),
497            compiled_at: std::time::Instant::now(),
498            reuse_count: std::sync::atomic::AtomicU64::new(0),
499        });
500        // Rebuild key after execution to reflect current write_generation (C-1).
501        let post_exec_key = self.build_plan_key(query);
502        self.compiled_plan_cache.insert(post_exec_key, compiled);
503    }
504
505    /// Dispatches a DML statement (INSERT, UPSERT, UPDATE, DELETE, or edge mutations).
506    pub(super) fn execute_dml(
507        &self,
508        dml: &crate::velesql::DmlStatement,
509        params: &std::collections::HashMap<String, serde_json::Value>,
510    ) -> Result<Vec<SearchResult>> {
511        match dml {
512            crate::velesql::DmlStatement::Insert(stmt)
513            | crate::velesql::DmlStatement::Upsert(stmt) => self.execute_insert(stmt, params),
514            crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
515            crate::velesql::DmlStatement::InsertEdge(stmt) => self.execute_insert_edge(stmt),
516            crate::velesql::DmlStatement::Delete(stmt) => self.execute_delete(stmt),
517            crate::velesql::DmlStatement::DeleteEdge(stmt) => self.execute_delete_edge(stmt),
518            crate::velesql::DmlStatement::SelectEdges(stmt) => self.execute_select_edges(stmt),
519            crate::velesql::DmlStatement::InsertNode(stmt) => self.execute_insert_node(stmt),
520        }
521    }
522}