Skip to main content

velesdb_core/database/
query_engine.rs

1//! Query execution: `execute_query`, `explain_query`, `explain_analyze_query`, plan caching, and DML dispatch.
2
3use crate::velesql::{
4    ActualStats, AdminStatement, DdlStatement, DmlStatement, ExplainOutput, IntrospectionStatement,
5    Query, TrainStatement,
6};
7use crate::{Error, Result, SearchResult};
8
9use super::Database;
10
11/// Statement type classification for dispatch routing.
12enum StatementType<'a> {
13    Admin(&'a AdminStatement),
14    Introspection(&'a IntrospectionStatement),
15    Ddl(&'a DdlStatement),
16    Train(&'a TrainStatement),
17    Dml(&'a DmlStatement),
18    Match,
19    Select,
20}
21
22/// Classifies a query into its statement type for routing.
23fn classify_statement(query: &Query) -> StatementType<'_> {
24    if let Some(admin) = query.admin.as_ref() {
25        return StatementType::Admin(admin);
26    }
27    if let Some(intro) = query.introspection.as_ref() {
28        return StatementType::Introspection(intro);
29    }
30    if let Some(ddl) = query.ddl.as_ref() {
31        return StatementType::Ddl(ddl);
32    }
33    if let Some(train) = query.train.as_ref() {
34        return StatementType::Train(train);
35    }
36    if let Some(dml) = query.dml.as_ref() {
37        return StatementType::Dml(dml);
38    }
39    if query.is_match_query() {
40        return StatementType::Match;
41    }
42    StatementType::Select
43}
44
45impl Database {
46    /// Produces a canonical JSON string for a `serde_json::Value`.
47    ///
48    /// Recursively sorts the keys of every JSON object so that two values
49    /// representing the same logical structure always produce identical bytes,
50    /// regardless of the `HashMap` iteration order used during serialization.
51    ///
52    /// This is required because `FusionConfig::params` and
53    /// `TrainStatement::params` are `HashMap`-backed; `serde_json` serialises
54    /// them in hash-order, which is non-deterministic across invocations.
55    fn canonical_json(value: serde_json::Value) -> serde_json::Value {
56        match value {
57            serde_json::Value::Object(map) => {
58                // Without the `preserve_order` feature flag, `serde_json::Map` is already
59                // backed by `BTreeMap` and therefore already sorted. This explicit sort
60                // step is kept as defense-in-depth: if `preserve_order` is ever enabled
61                // in `Cargo.toml` (which switches the backing store to `IndexMap` and
62                // preserves insertion order), the canonical key ordering is still upheld
63                // without any change to this function.
64                let sorted: serde_json::Map<String, serde_json::Value> = map
65                    .into_iter()
66                    .map(|(k, v)| (k, Self::canonical_json(v)))
67                    .collect::<std::collections::BTreeMap<_, _>>()
68                    .into_iter()
69                    .collect();
70                serde_json::Value::Object(sorted)
71            }
72            serde_json::Value::Array(arr) => {
73                serde_json::Value::Array(arr.into_iter().map(Self::canonical_json).collect())
74            }
75            other => other,
76        }
77    }
78
79    /// Builds a deterministic cache key for a query (CACHE-02).
80    ///
81    /// Serialises the query to canonical JSON (object keys sorted recursively),
82    /// reads the current `schema_version`, and gathers per-collection
83    /// `write_generation` counters (sorted by collection name) to form a
84    /// `PlanKey`.
85    ///
86    /// # Why canonical JSON instead of `Debug`
87    ///
88    /// `format!("{query:?}")` is non-deterministic when the `Query` AST
89    /// contains `HashMap`-backed fields (`FusionConfig::params`,
90    /// `TrainStatement::params`) because `HashMap` iteration order is not
91    /// guaranteed across invocations. Canonical JSON with sorted object keys
92    /// is stable and produces the same byte sequence for logically identical
93    /// queries.
94    #[must_use]
95    pub fn build_plan_key(&self, query: &crate::velesql::Query) -> crate::cache::PlanKey {
96        use std::hash::{BuildHasher, Hasher};
97
98        // Serialise via serde_json, then canonicalise (sort object keys) before hashing.
99        // Fallback to Debug representation if serialization fails (should never happen in
100        // practice since all Query fields are Serialize, but erring on the side of liveness).
101        let query_text = serde_json::to_value(query)
102            .map(Self::canonical_json)
103            .and_then(|v| serde_json::to_string(&v))
104            .unwrap_or_else(|_| format!("{query:?}"));
105
106        let mut hasher = rustc_hash::FxBuildHasher.build_hasher();
107        hasher.write(query_text.as_bytes());
108        let query_hash = hasher.finish();
109
110        let schema_version = self.schema_version();
111        let collection_names = Self::referenced_collection_names(query);
112
113        // Build generations vector in sorted collection order.
114        let collection_generations: smallvec::SmallVec<[u64; 4]> = collection_names
115            .iter()
116            .map(|name| self.collection_write_generation(name).unwrap_or(0))
117            .collect();
118
119        // Issue #608: parallel vector of analyze generations so that running
120        // ANALYZE alone (no data mutation) still flips the cache key and
121        // rebuilds plans with the fresh calibrated cost estimates.
122        let analyze_generations: smallvec::SmallVec<[u64; 4]> = collection_names
123            .iter()
124            .map(|name| self.collection_analyze_generation(name).unwrap_or(0))
125            .collect();
126
127        crate::cache::PlanKey {
128            // Issue #902: store the canonical text so PlanKey equality is
129            // collision-safe. query_hash stays a Hash accelerator only.
130            query_text: query_text.into(),
131            query_hash,
132            schema_version,
133            collection_generations,
134            analyze_generations,
135        }
136    }
137
138    /// Returns the query plan for a query, with cache status populated (CACHE-02).
139    ///
140    /// If the plan is cached, returns it with `cache_hit: Some(true)` and
141    /// `plan_reuse_count` set. Otherwise generates a fresh plan with
142    /// `cache_hit: Some(false)`.
143    ///
144    /// # Design decision: `explain_query` does not populate the cache
145    ///
146    /// `explain_query` intentionally does **not** insert a new plan into the
147    /// compiled plan cache. EXPLAIN is a diagnostic operation; allowing it to
148    /// influence cache state would make cache metrics (hit/miss ratios,
149    /// `plan_reuse_count`) unreliable because EXPLAIN calls would be
150    /// indistinguishable from real execution hits. Only `execute_query` is
151    /// authorised to write to the cache.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the query is invalid.
156    pub fn explain_query(
157        &self,
158        query: &crate::velesql::Query,
159    ) -> Result<crate::velesql::QueryPlan> {
160        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
161
162        let plan_key = self.build_plan_key(query);
163
164        if let Some(cached) = self.compiled_plan_cache.get(&plan_key) {
165            let mut plan = cached.plan.clone();
166            plan.cache_hit = Some(true);
167            plan.plan_reuse_count = Some(
168                cached
169                    .reuse_count
170                    .load(std::sync::atomic::Ordering::Relaxed),
171            );
172            return Ok(plan);
173        }
174
175        let mut plan = self.build_plan_with_stats(query);
176        plan.cache_hit = Some(false);
177        plan.plan_reuse_count = Some(0);
178        Ok(plan)
179    }
180
181    /// Builds a query plan, resolving calibrated collection statistics AND
182    /// the registered secondary index set from the registry when available
183    /// (#471 — EXPLAIN real costs, #607 — `IndexLookup` wiring).
184    ///
185    /// The returned plan's `estimated_cost_ms` and `filter_strategy` are
186    /// calibrated via `CostEstimator` when stats exist for the query's
187    /// primary collection. Falls back to heuristics otherwise. The
188    /// `indexed_fields` argument is populated from
189    /// `Database::indexed_fields_for` so that `IndexLookup` nodes appear
190    /// in the EXPLAIN tree for WHERE clauses targeting indexed columns.
191    fn build_plan_with_stats(&self, query: &crate::velesql::Query) -> crate::velesql::QueryPlan {
192        let primary = &query.select.from;
193        let core_stats = self.get_collection_stats(primary).ok().flatten();
194        let indexed = self.indexed_fields_for(primary);
195        crate::velesql::QueryPlan::from_query_with_stats(query, &indexed, core_stats.as_ref())
196    }
197
198    /// Executes a query with instrumentation and returns both plan and actual stats.
199    ///
200    /// Unlike `explain_query` (plan only) and `execute_query` (results only),
201    /// this method returns the full [`ExplainOutput`] with measured statistics.
202    /// The normal `execute_query` path is untouched — zero overhead on
203    /// non-ANALYZE queries.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if the query is invalid or execution fails.
208    pub fn explain_analyze_query(
209        &self,
210        query: &Query,
211        params: &std::collections::HashMap<String, serde_json::Value>,
212    ) -> Result<ExplainOutput> {
213        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
214
215        let plan = self.explain_query(query)?;
216        let start = std::time::Instant::now();
217        let results = self.execute_query(query, params)?;
218        let elapsed = start.elapsed();
219
220        let actual_rows = results.len() as u64;
221        let actual_time_ms = elapsed.as_secs_f64() * 1000.0;
222        let is_match = query.is_match_query();
223        let (nodes_visited, edges_traversed) = if is_match {
224            (actual_rows, actual_rows)
225        } else {
226            (0, 0)
227        };
228
229        let stats = ActualStats {
230            actual_rows,
231            actual_time_ms,
232            loops: 1,
233            nodes_visited,
234            edges_traversed,
235        };
236
237        let node_stats =
238            crate::velesql::build_leaf_node_stats(&plan.root, actual_rows, actual_time_ms);
239        Ok(ExplainOutput::with_stats(plan, stats, node_stats))
240    }
241
242    /// Executes a `VelesQL` query with database-level JOIN resolution.
243    ///
244    /// This method resolves JOIN target collections from the database registry
245    /// and executes JOIN runtime in sequence. Query plans are cached and
246    /// reused for identical queries against unchanged collections (CACHE-02).
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if the base collection or any JOIN collection is missing.
251    pub fn execute_query(
252        &self,
253        query: &crate::velesql::Query,
254        params: &std::collections::HashMap<String, serde_json::Value>,
255    ) -> Result<Vec<SearchResult>> {
256        crate::velesql::QueryValidator::validate(query).map_err(|e| Error::Query(e.to_string()))?;
257
258        if let Some(results) = self.dispatch_non_select(query, params)? {
259            return Ok(results);
260        }
261
262        // Build plan key and check cache WITHOUT recording hit/miss metrics (CACHE-02).
263        //
264        // `contains()` is used instead of `get().is_some()` so that this
265        // existence check does not increment the hit/miss counters or
266        // `reuse_count`. Only `explain_query` (which surfaces these values to
267        // callers) should call `get()`.
268        let pre_exec_key = self.build_plan_key(query);
269        let is_cached = self.compiled_plan_cache.contains(&pre_exec_key);
270
271        let results = self.execute_select_query(query, params)?;
272
273        // Populate cache on miss (CACHE-02).
274        //
275        // C-1 TOCTOU fix: rebuild the plan key AFTER execution. Between the
276        // pre-execution `contains()` check and here, a concurrent writer may
277        // have bumped a collection's `write_generation` (e.g. via `upsert` on
278        // another thread). Rebuilding the key captures the post-execution
279        // state, so the cached plan is associated with the generation that was
280        // live when the plan was actually compiled — not a potentially stale
281        // pre-execution snapshot.
282        if !is_cached {
283            self.populate_plan_cache(query);
284        }
285
286        Ok(results)
287    }
288
289    /// Classifies and dispatches non-SELECT statement types.
290    ///
291    /// Returns `Ok(Some(results))` if handled, `Ok(None)` for SELECT queries.
292    fn dispatch_non_select(
293        &self,
294        query: &crate::velesql::Query,
295        params: &std::collections::HashMap<String, serde_json::Value>,
296    ) -> Result<Option<Vec<SearchResult>>> {
297        // Classify the statement type (at most one is Some).
298        let stmt_type = classify_statement(query);
299        match stmt_type {
300            StatementType::Admin(admin) => Ok(Some(self.execute_admin(admin)?)),
301            StatementType::Introspection(intro) => Ok(Some(self.execute_introspection(intro)?)),
302            StatementType::Ddl(ddl) => Ok(Some(self.execute_ddl(ddl)?)),
303            StatementType::Train(train) => Ok(Some(self.execute_train(train)?)),
304            StatementType::Dml(dml) => Ok(Some(self.execute_dml(dml, params)?)),
305            StatementType::Match => {
306                // Route MATCH queries to the target collection.
307                // Resolution order:
308                // 1. select.from (e.g. "SELECT * FROM kg WHERE MATCH ...")
309                // 2. "_collection" key in params (programmatic API)
310                // 3. Error with guidance
311                let collection_name = if !query.select.from.is_empty() {
312                    query.select.from.clone()
313                } else if let Some(serde_json::Value::String(name)) = params.get("_collection") {
314                    name.clone()
315                } else {
316                    return Err(Error::Query(
317                        "MATCH query requires a target collection. Either use \
318                         SELECT ... FROM <collection> WHERE MATCH ..., or pass \
319                         {\"_collection\": \"name\"} in params."
320                            .to_string(),
321                    ));
322                };
323                let coll = self.resolve_collection(&collection_name)?;
324                let mut results = coll.execute_query(query, params)?;
325
326                // Cross-collection enrichment: if any node pattern has a
327                // @collection annotation, look up payloads from those
328                // collections and merge into the projected fields.
329                if let Some(mc) = &query.match_clause {
330                    self.enrich_match_results_cross_collection(mc, &mut results);
331                }
332
333                Ok(Some(results))
334            }
335            StatementType::Select => Ok(None),
336        }
337    }
338
339    /// Executes the SELECT portion of a query, resolving JOINs if present.
340    fn execute_select_query(
341        &self,
342        query: &crate::velesql::Query,
343        params: &std::collections::HashMap<String, serde_json::Value>,
344    ) -> Result<Vec<SearchResult>> {
345        // EPIC-040 US-006: For compound queries, strip LIMIT from each operand so
346        // the set operation sees the full result sets.  The final LIMIT is applied
347        // once on the merged output (SQL-standard behaviour).
348        // Use MAX_LIMIT (not None) to avoid the default-10 cap downstream.
349        const COMPOUND_LIMIT: usize = 100_000;
350        let compound_limit = Some(COMPOUND_LIMIT as u64); // 100_000 fits u64 exactly.
351        let left_results = if query.compound.is_some() {
352            let mut left_query = query.clone();
353            left_query.select.limit = compound_limit;
354            self.execute_single_select(&left_query, params)?
355        } else {
356            return self.execute_single_select(query, params);
357        };
358
359        // compound is guaranteed Some here (non-compound returns above).
360        if let Some(ref compound) = query.compound {
361            let mut accumulated = left_results;
362            for (operator, right_select) in &compound.operations {
363                let mut right_query = crate::velesql::Query::new_select(right_select.clone());
364                right_query.select.limit = compound_limit;
365                let right_results = self.execute_single_select(&right_query, params)?;
366                accumulated = crate::collection::search::query::set_operations::apply_set_operation(
367                    accumulated,
368                    right_results,
369                    *operator,
370                    // Intermediate ops keep the server-side ceiling: truncating to the
371                    // user LIMIT here would drop rows a later chained set op still needs.
372                    COMPOUND_LIMIT,
373                );
374            }
375            // SQL-standard: LIMIT from the left (outer) SELECT applies to the final result.
376            if let Some(limit) = query.select.limit {
377                accumulated.truncate(usize::try_from(limit).unwrap_or(usize::MAX));
378            }
379            return Ok(accumulated);
380        }
381
382        Ok(left_results)
383    }
384
385    /// Collects sorted, deduplicated collection names referenced by a query,
386    /// including all compound operands (UNION, INTERSECT, EXCEPT).
387    ///
388    /// RF-DEDUP: Shared by `build_plan_key` and `populate_plan_cache`, which
389    /// both need the same sorted collection-name list from the query AST.
390    fn referenced_collection_names(query: &crate::velesql::Query) -> Vec<String> {
391        let mut names = vec![query.select.from.clone()];
392        for join in &query.select.joins {
393            names.push(join.table.clone());
394        }
395        if let Some(ref compound) = query.compound {
396            for (_, right_select) in &compound.operations {
397                names.push(right_select.from.clone());
398                for join in &right_select.joins {
399                    names.push(join.table.clone());
400                }
401            }
402        }
403        names.sort();
404        names.dedup();
405        names
406    }
407
408    /// Resolves a collection by name from all typed registries.
409    ///
410    /// Priority: vector collections first, then graph, then metadata.
411    /// Returns the inner `Collection` for query execution.
412    pub(super) fn resolve_collection(&self, name: &str) -> Result<crate::collection::Collection> {
413        if let Some(vc) = self.get_vector_collection(name) {
414            return Ok(vc.inner);
415        }
416        if let Some(gc) = self.get_graph_collection(name) {
417            return Ok(gc.inner);
418        }
419        if let Some(mc) = self.get_metadata_collection(name) {
420            return Ok(mc.inner);
421        }
422        Err(Error::CollectionNotFound(name.to_string()))
423    }
424
425    /// Resolves a collection that supports write operations (INSERT/UPDATE/TRAIN).
426    ///
427    /// Checks vector, graph, and metadata collections. Metadata-only collections
428    /// support INSERT/UPDATE for metadata fields (no vectors).
429    pub(super) fn resolve_writable_collection(
430        &self,
431        name: &str,
432    ) -> Result<crate::collection::Collection> {
433        if let Some(vc) = self.get_vector_collection(name) {
434            return Ok(vc.inner);
435        }
436        if let Some(gc) = self.get_graph_collection(name) {
437            return Ok(gc.inner);
438        }
439        if let Some(mc) = self.get_metadata_collection(name) {
440            return Ok(mc.inner);
441        }
442        Err(Error::CollectionNotFound(name.to_string()))
443    }
444
445    /// Executes a single SELECT (no compound), resolving JOINs if present.
446    ///
447    /// Orchestrates filter pushdown and join strategy selection:
448    /// 1. Analyze WHERE for pushdown-eligible conditions
449    /// 2. Strip pushed conditions from base query
450    /// 3. For each JOIN: lookup, filtered, or full `ColumnStore` path
451    /// 4. Apply post-join filters (cross-source predicates)
452    fn execute_single_select(
453        &self,
454        query: &crate::velesql::Query,
455        params: &std::collections::HashMap<String, serde_json::Value>,
456    ) -> Result<Vec<SearchResult>> {
457        let base_collection = self.resolve_collection(&query.select.from)?;
458
459        let mut single_query = query.clone();
460        single_query.compound = None;
461
462        if single_query.select.joins.is_empty() {
463            return base_collection.execute_query(&single_query, params);
464        }
465
466        let analysis = Self::prepare_join_pushdown(&mut single_query, params)?;
467        let pushed = analysis.column_store_filters.clone();
468
469        let row_budget = Self::join_row_budget(&query.select, &analysis);
470
471        let mut results = base_collection.execute_query(&single_query, params)?;
472        for join in &query.select.joins {
473            results = self.execute_single_join(&results, join, &pushed, row_budget)?;
474        }
475
476        // Apply post-join filters: cross-source predicates that reference
477        // columns from both the base collection and joined ColumnStore tables.
478        if !analysis.post_join_filters.is_empty() {
479            results = Self::apply_post_join_filters(
480                &base_collection,
481                results,
482                &analysis.post_join_filters,
483                params,
484                &query.select.from_alias,
485            )?;
486        }
487
488        Ok(results)
489    }
490
491    /// Resolves WHERE parameters, runs pushdown analysis, and strips pushed
492    /// conditions from the base query (JOIN path of `execute_single_select`).
493    ///
494    /// Parameter placeholders are resolved before the analysis so pushed-down
495    /// filters never silently convert them to NULL at the `ColumnStore` layer
496    /// (the collection pipeline resolves its own copy independently).
497    fn prepare_join_pushdown(
498        single_query: &mut crate::velesql::Query,
499        params: &std::collections::HashMap<String, serde_json::Value>,
500    ) -> Result<crate::collection::search::query::pushdown::PushdownAnalysis> {
501        if let Some(cond) = single_query.select.where_clause.take() {
502            single_query.select.where_clause = Some(
503                crate::collection::Collection::resolve_condition_params(&cond, params)?,
504            );
505        }
506
507        let analysis = Self::analyze_join_pushdown_for_select(&single_query.select);
508
509        let resolved_where = single_query.select.where_clause.clone();
510        single_query.select.joins.clear();
511        if !analysis.column_store_filters.is_empty() {
512            single_query.select.where_clause = Self::strip_pushed_conditions(
513                resolved_where.as_ref(),
514                &analysis.column_store_filters,
515            );
516        }
517        Ok(analysis)
518    }
519
520    /// Computes the bound on joined rows to materialize.
521    ///
522    /// When the query has an explicit LIMIT, no post-join filters, and no
523    /// ORDER BY (which could reorder past the window), the bound is the
524    /// effective `LIMIT + OFFSET`. GROUP BY / HAVING / DISTINCT also disqualify
525    /// the bounded shape: SQL LIMIT bounds output *groups/rows*, not input rows,
526    /// so truncating joined input to `LIMIT` would drop rows that belong to
527    /// groups still inside the window. Otherwise downstream stages may reorder or
528    /// drop rows, so we fall back to the conservative server-side ceiling
529    /// [`JOIN_ROW_CEILING`] — still bounding OOM without affecting correctness.
530    pub(super) fn join_row_budget(
531        select: &crate::velesql::SelectStatement,
532        analysis: &crate::collection::search::query::pushdown::PushdownAnalysis,
533    ) -> usize {
534        use crate::collection::search::query::JOIN_ROW_CEILING;
535        use crate::velesql::DistinctMode;
536        let bounded_shape = analysis.post_join_filters.is_empty()
537            && select.order_by.is_none()
538            && select.group_by.is_none()
539            && select.having.is_none()
540            && select.distinct == DistinctMode::None;
541        match select.limit {
542            Some(limit) if bounded_shape => {
543                let limit = usize::try_from(limit).unwrap_or(JOIN_ROW_CEILING);
544                let offset = select
545                    .offset
546                    .map_or(0, |o| usize::try_from(o).unwrap_or(JOIN_ROW_CEILING));
547                limit.saturating_add(offset).min(JOIN_ROW_CEILING)
548            }
549            _ => JOIN_ROW_CEILING,
550        }
551    }
552
553    // NOTE: analyze_join_pushdown_for_select, apply_post_join_filters
554    // moved to join_pushdown.rs (NLOC/file reduction)
555
556    /// Inserts a compiled plan into the cache after a cache miss (CACHE-02).
557    fn populate_plan_cache(&self, query: &crate::velesql::Query) {
558        let compiled = std::sync::Arc::new(crate::cache::CompiledPlan {
559            plan: self.build_plan_with_stats(query),
560            referenced_collections: Self::referenced_collection_names(query),
561            compiled_at: std::time::Instant::now(),
562            reuse_count: std::sync::atomic::AtomicU64::new(0),
563        });
564        // Rebuild key after execution to reflect current write_generation (C-1).
565        let post_exec_key = self.build_plan_key(query);
566        self.compiled_plan_cache.insert(post_exec_key, compiled);
567    }
568
569    /// Dispatches a DML statement (INSERT, UPSERT, UPDATE, DELETE, or edge mutations).
570    pub(super) fn execute_dml(
571        &self,
572        dml: &crate::velesql::DmlStatement,
573        params: &std::collections::HashMap<String, serde_json::Value>,
574    ) -> Result<Vec<SearchResult>> {
575        match dml {
576            crate::velesql::DmlStatement::Insert(stmt)
577            | crate::velesql::DmlStatement::Upsert(stmt) => self.execute_insert(stmt, params),
578            crate::velesql::DmlStatement::Update(stmt) => self.execute_update(stmt, params),
579            crate::velesql::DmlStatement::InsertEdge(stmt) => self.execute_insert_edge(stmt),
580            crate::velesql::DmlStatement::Delete(stmt) => self.execute_delete(stmt),
581            crate::velesql::DmlStatement::DeleteEdge(stmt) => self.execute_delete_edge(stmt),
582            crate::velesql::DmlStatement::SelectEdges(stmt) => self.execute_select_edges(stmt),
583            crate::velesql::DmlStatement::InsertNode(stmt) => self.execute_insert_node(stmt),
584        }
585    }
586}